package org.apache.pulsar.functions.instance.state;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Generated;
import org.apache.pulsar.functions.api.StateStore;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Stopwatch;
import org.apache.pulsar.functions.runtime.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.StorageClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.Table;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.admin.SimpleStorageAdminClientImpl;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.exceptions.ClientException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.exceptions.InternalServerException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.utils.ClientResources;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.Backoff;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StorageType;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StreamConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.protocol.ProtocolConstants;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.class */
public class BKStateStoreProviderImpl implements StateStoreProvider {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(BKStateStoreProviderImpl.class);
    private String stateStorageServiceUrl;
    private Map<String, StorageClient> clients;

    @Override // org.apache.pulsar.functions.instance.state.StateStoreProvider
    public void init(Map<String, Object> map) throws Exception {
        this.stateStorageServiceUrl = (String) map.get(StateStoreProvider.STATE_STORAGE_SERVICE_URL);
        this.clients = new HashMap();
    }

    private StorageClient getStorageClient(String str, String str2) {
        String stateNamespace = FunctionCommon.getStateNamespace(str, str2);
        StorageClient storageClient = this.clients.get(stateNamespace);
        if (null != storageClient) {
            return storageClient;
        }
        StorageClient build = StorageClientBuilder.newBuilder().withSettings(StorageClientSettings.newBuilder().serviceUri(this.stateStorageServiceUrl).enableServerSideRouting(true).clientName("function-" + stateNamespace).backoffPolicy(Backoff.Jitter.of(Backoff.Jitter.Type.EXPONENTIAL, 100L, 2000L, 60L)).build()).withNamespace(stateNamespace).build();
        this.clients.put(stateNamespace, build);
        return build;
    }

    private void createStateTable(String str, String str2, String str3, String str4) throws Exception {
        String stateNamespace = FunctionCommon.getStateNamespace(str2, str3);
        SimpleStorageAdminClientImpl simpleStorageAdminClientImpl = new SimpleStorageAdminClientImpl(StorageClientSettings.newBuilder().serviceUri(str).build(), ClientResources.create().scheduler());
        try {
            StreamConfiguration build = StreamConfiguration.newBuilder(ProtocolConstants.DEFAULT_STREAM_CONF).setInitialNumRanges(4).setMinNumRanges(4).setStorageType(StorageType.TABLE).build();
            Stopwatch createStarted = Stopwatch.createStarted();
            Exception exc = null;
            while (createStarted.elapsed(TimeUnit.MINUTES) < 1) {
                try {
                    FutureUtils.result(simpleStorageAdminClientImpl.getStream(stateNamespace, str4));
                    simpleStorageAdminClientImpl.close();
                    return;
                } catch (NamespaceNotFoundException e) {
                    try {
                        FutureUtils.result(simpleStorageAdminClientImpl.createNamespace(stateNamespace, NamespaceConfiguration.newBuilder().setDefaultStreamConf(build).build()));
                    } catch (Exception e2) {
                        exc = e2;
                        log.warn("Encountered exception when creating namespace {} for state table", str4, e2);
                    }
                    try {
                        FutureUtils.result(simpleStorageAdminClientImpl.createStream(stateNamespace, str4, build));
                    } catch (Exception e3) {
                        exc = e3;
                        log.warn("Encountered exception when creating table {}/{}", new Object[]{stateNamespace, str4, e3});
                    }
                } catch (StreamNotFoundException e4) {
                    try {
                        FutureUtils.result(simpleStorageAdminClientImpl.createStream(stateNamespace, str4, build));
                    } catch (Exception e5) {
                        exc = e5;
                        log.warn("Encountered exception when creating table {}/{}", new Object[]{stateNamespace, str4, e5});
                    }
                } catch (ClientException e6) {
                    log.warn("Encountered issue {} on fetching state stable metadata, re-attempting in 100 milliseconds", e6.getMessage());
                    TimeUnit.MILLISECONDS.sleep(100L);
                }
            }
            throw new IOException(String.format("Failed to setup / verify state table for function %s/%s/%s within timeout", str2, str4, str4), exc);
        } catch (Throwable th) {
            try {
                simpleStorageAdminClientImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private Table<ByteBuf, ByteBuf> openStateTable(String str, String str2, String str3) throws Exception {
        StorageClient storageClient = getStorageClient(str, str2);
        log.info("Opening state table for function {}/{}/{}", new Object[]{str, str2, str3});
        Stopwatch createStarted = Stopwatch.createStarted();
        while (createStarted.elapsed(TimeUnit.MINUTES) < 1) {
            try {
                return (Table) FutureUtils.result(storageClient.openTable(str3), 1L, TimeUnit.MINUTES);
            } catch (TimeoutException e) {
                throw new RuntimeException("Failed to open state table for function " + str + "/" + str2 + "/" + str3 + " within timeout period", e);
            } catch (InternalServerException e2) {
                log.warn("Encountered internal server on opening state table '{}/{}/{}',  re-attempt in 100 milliseconds : {}", new Object[]{str, str2, str3, e2.getMessage()});
                TimeUnit.MILLISECONDS.sleep(100L);
            }
        }
        throw new IOException("Failed to open state table for function " + str + "/" + str2 + "/" + str3);
    }

    @Override // org.apache.pulsar.functions.instance.state.StateStoreProvider
    public <T extends StateStore> T getStateStore(String str, String str2, String str3) throws Exception {
        createStateTable(this.stateStorageServiceUrl, str, str2, str3);
        return new BKStateStoreImpl(str, str2, str3, openStateTable(str, str2, str3));
    }

    @Override // org.apache.pulsar.functions.instance.state.StateStoreProvider
    public void cleanUp(String str, String str2, String str3) throws Exception {
        SimpleStorageAdminClientImpl simpleStorageAdminClientImpl = new SimpleStorageAdminClientImpl(StorageClientSettings.newBuilder().serviceUri(this.stateStorageServiceUrl).build(), ClientResources.create().scheduler());
        String stateNamespace = FunctionCommon.getStateNamespace(str, str2);
        simpleStorageAdminClientImpl.deleteStream(stateNamespace, str3).whenComplete((bool, th) -> {
            if ((th == null && bool.booleanValue()) || (th instanceof NamespaceNotFoundException) || (th instanceof StreamNotFoundException)) {
                log.info("{}/{} table deleted successfully", stateNamespace, str3);
            } else if (th != null) {
                log.error("{}/{} table deletion failed {}  but moving on", new Object[]{stateNamespace, str3, th});
            } else {
                log.error("{}/{} table deletion failed but moving on", stateNamespace, str3);
            }
        });
        simpleStorageAdminClientImpl.close();
    }

    @Override // org.apache.pulsar.functions.instance.state.StateStoreProvider, java.lang.AutoCloseable
    public void close() {
        this.clients.forEach((str, storageClient) -> {
            storageClient.closeAsync().exceptionally(th -> {
                log.warn("Failed to close state storage client", th);
                return null;
            });
        });
        this.clients.clear();
    }
}
