package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.cluster;

import java.io.File;
import java.io.IOException;
import java.net.BindException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Preconditions;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Lists;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.exceptions.ClientException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.utils.NetUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.component.LifecycleComponent;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.NamespaceProperties;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.protocol.ProtocolConstants;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.server.StorageServer;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.LocalDLMEmulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/bookkeeper/stream/cluster/StreamCluster.class */
public class StreamCluster extends AbstractLifecycleComponent<StorageConfiguration> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) StreamCluster.class);
    private static final int MAX_RETRIES = 20;
    private final StreamClusterSpec spec;
    private final List<Endpoint> rpcEndpoints;
    private ServiceURI metadataServiceUri;
    private int zkPort;
    private ZooKeeperServerShim zks;
    private List<LifecycleComponent> servers;
    private int nextBookiePort;
    private int nextGrpcPort;

    public static StreamCluster build(StreamClusterSpec streamClusterSpec) {
        return new StreamCluster(streamClusterSpec);
    }

    private static ServerConfiguration newBookieConfiguration(ServiceURI serviceURI) {
        ServerConfiguration serverConfiguration = new ServerConfiguration();
        serverConfiguration.setMetadataServiceUri(serviceURI.getUri().toString());
        serverConfiguration.setAllowLoopback(true);
        serverConfiguration.setGcWaitTime(300000L);
        serverConfiguration.setDiskUsageWarnThreshold(0.9999f);
        serverConfiguration.setDiskUsageThreshold(0.999999f);
        return serverConfiguration;
    }

    private StreamCluster(StreamClusterSpec streamClusterSpec) {
        super("stream-cluster", new StorageConfiguration(streamClusterSpec.baseConf()), NullStatsLogger.INSTANCE);
        this.spec = streamClusterSpec;
        this.servers = Lists.newArrayListWithExpectedSize(streamClusterSpec.numServers());
        this.rpcEndpoints = Lists.newArrayListWithExpectedSize(streamClusterSpec.numServers());
        this.nextBookiePort = streamClusterSpec.initialBookiePort();
        this.nextGrpcPort = streamClusterSpec.initialGrpcPort();
    }

    public List<Endpoint> getRpcEndpoints() {
        return this.rpcEndpoints;
    }

    private void startZooKeeper() throws Exception {
        if (!this.spec.shouldStartZooKeeper()) {
            this.metadataServiceUri = (ServiceURI) Preconditions.checkNotNull(this.spec.metadataServiceUri, "No metadata service uri is configured while configuring not to start zookeeper");
            return;
        }
        Pair<ZooKeeperServerShim, Integer> runZookeeperOnAnyPort = LocalDLMEmulator.runZookeeperOnAnyPort(this.spec.zkPort(), new File(this.spec.storageRootDir(), "zookeeper"));
        this.zks = runZookeeperOnAnyPort.getLeft();
        this.zkPort = runZookeeperOnAnyPort.getRight().intValue();
        log.info("Started zookeeper at port {}.", Integer.valueOf(this.zkPort));
        this.metadataServiceUri = ServiceURI.create("zk://127.0.0.1:" + this.zkPort + "/ledgers");
    }

    private void stopZooKeeper() {
        if (null != this.zks) {
            this.zks.stop();
        }
    }

    private void initializeCluster() throws Exception {
        Preconditions.checkArgument(ServiceURI.SERVICE_ZK.equals(this.metadataServiceUri.getServiceName()), "Only support zookeeper based metadata service now");
        new ZkClusterInitializer(StringUtils.join((Object[]) this.metadataServiceUri.getServiceHosts(), ',')).initializeCluster(this.metadataServiceUri.getUri(), this.spec.numServers() * 2);
        MetadataDrivers.runFunctionWithMetadataBookieDriver(newBookieConfiguration(this.metadataServiceUri), metadataBookieDriver -> {
            try {
                if (metadataBookieDriver.getRegistrationManager().initNewCluster()) {
                    log.info("Successfully initialized the segment storage");
                } else {
                    log.info("The segment storage was already initialized");
                }
                return null;
            } catch (Exception e) {
                throw new StorageRuntimeException("Failed to initialize the segment storage", e);
            }
        });
    }

    private LifecycleComponent startServer() throws Exception {
        int i;
        int i2;
        int i3 = 0;
        while (0 == 0) {
            synchronized (this) {
                i = this.nextBookiePort;
                this.nextBookiePort = i + 1;
                i2 = this.nextGrpcPort;
                this.nextGrpcPort = i2 + 1;
            }
            LifecycleComponent lifecycleComponent = null;
            try {
                ServerConfiguration newBookieConfiguration = newBookieConfiguration(this.metadataServiceUri);
                newBookieConfiguration.loadConf(this.spec.baseConf());
                newBookieConfiguration.setBookiePort(i);
                File file = new File(this.spec.storageRootDir(), "bookie_" + i);
                newBookieConfiguration.setJournalDirName(file.getPath());
                newBookieConfiguration.setLedgerDirNames(new String[]{file.getPath()});
                File file2 = new File(this.spec.storageRootDir(), "ranges_" + i2);
                new StorageConfiguration(newBookieConfiguration).setRangeStoreDirNames(new String[]{file2.getPath()});
                log.info("Attempting to start storage server at (bookie port = {}, grpc port = {}) : bkDir = {}, rangesStoreDir = {}", Integer.valueOf(i), Integer.valueOf(i2), file, file2);
                lifecycleComponent = StorageServer.buildStorageServer(newBookieConfiguration, i2);
                lifecycleComponent.start();
                log.info("Started storage server at (bookie port = {}, grpc port = {})", Integer.valueOf(i), Integer.valueOf(i2));
                this.rpcEndpoints.add(StorageServer.createLocalEndpoint(i2, false));
                return lifecycleComponent;
            } catch (Throwable th) {
                log.error("Failed to start storage server", th);
                if (null != lifecycleComponent) {
                    lifecycleComponent.stop();
                }
                if (!(th.getCause() instanceof BindException)) {
                    throw th;
                }
                i3++;
                if (i3 > 20) {
                    throw ((BindException) th.getCause());
                }
            }
        }
        throw new IOException("Failed to start any storage server.");
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void startServers() throws Exception {
        log.info("Starting {} storage servers.", Integer.valueOf(this.spec.numServers()));
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < this.spec.numServers(); i++) {
            newArrayList.add(newCachedThreadPool.submit(() -> {
                return startServer();
            }));
        }
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            this.servers.add(((Future) it.next()).get());
        }
        log.info("Started {} storage servers.", Integer.valueOf(this.spec.numServers()));
        newCachedThreadPool.shutdown();
    }

    private void createDefaultNamespaces() throws Exception {
        String format = String.format("bk://%s/", getRpcEndpoints().stream().map(endpoint -> {
            return NetUtils.endpointToString(endpoint);
        }).collect(Collectors.joining(",")));
        StorageClientSettings build = StorageClientSettings.newBuilder().serviceUri(format).usePlaintext(true).build();
        log.info("Service uri are : {}", format);
        StorageAdminClient buildAdmin = StorageClientBuilder.newBuilder().withSettings(build).buildAdmin();
        Throwable th = null;
        try {
            boolean z = false;
            while (!z) {
                try {
                    log.info("Namespace '{}':\n{}", "default", (NamespaceProperties) FutureUtils.result(buildAdmin.getNamespace("default")));
                    z = true;
                } catch (NamespaceNotFoundException e) {
                    log.info("Namespace '{}' is not found.", "default");
                    log.info("Creating namespace '{}' ...", "default");
                    try {
                        NamespaceProperties namespaceProperties = (NamespaceProperties) FutureUtils.result(buildAdmin.createNamespace("default", NamespaceConfiguration.newBuilder().setDefaultStreamConf(ProtocolConstants.DEFAULT_STREAM_CONF).build()));
                        log.info("Successfully created namespace '{}':", "default");
                        log.info("{}", namespaceProperties);
                    } catch (ClientException e2) {
                    }
                }
            }
            if (buildAdmin != null) {
                if (0 == 0) {
                    buildAdmin.close();
                    return;
                }
                try {
                    buildAdmin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (buildAdmin != null) {
                if (0 != 0) {
                    try {
                        buildAdmin.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    buildAdmin.close();
                }
            }
            throw th3;
        }
    }

    private void stopServers() {
        Iterator<LifecycleComponent> it = this.servers.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.component.AbstractLifecycleComponent
    protected void doStart() {
        try {
            startZooKeeper();
            initializeCluster();
            startServers();
            createDefaultNamespaces();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.component.AbstractLifecycleComponent
    protected void doStop() {
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.component.AbstractLifecycleComponent
    protected void doClose() throws IOException {
        stopServers();
        stopZooKeeper();
    }
}
