package org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog;

import java.io.File;
import java.io.IOException;
import java.net.BindException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Optional;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.IOUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.LocalBookKeeper;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.io.FileUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.metadata.DLMetadata;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.WatchedEvent;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.client.ZKClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/distributedlog/LocalDLMEmulator.class */
public class LocalDLMEmulator {
    private static final Logger LOG;
    public static final String DLOG_NAMESPACE = "/messaging/distributedlog";
    private static final int DEFAULT_BOOKIE_INITIAL_PORT = 0;
    private static final int DEFAULT_ZK_TIMEOUT_SEC = 10;
    private static final int DEFAULT_ZK_PORT = 2181;
    private static final String DEFAULT_ZK_HOST = "127.0.0.1";
    private static final String DEFAULT_ZK_ENSEMBLE = "127.0.0.1:2181";
    private static final int DEFAULT_NUM_BOOKIES = 3;
    private static final ServerConfiguration DEFAULT_SERVER_CONFIGURATION;
    private final String zkEnsemble;
    private final URI uri;
    private final List<File> tmpDirs;
    private final int zkTimeoutSec;
    private final Thread bkStartupThread;
    private final String zkHost;
    private final int zkPort;
    private final int numBookies;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/distributedlog/LocalDLMEmulator$Builder.class */
    public static class Builder {
        private int zkTimeoutSec = 10;
        private int numBookies = 3;
        private String zkHost = LocalDLMEmulator.DEFAULT_ZK_HOST;
        private int zkPort = LocalDLMEmulator.DEFAULT_ZK_PORT;
        private int initialBookiePort = 0;
        private boolean shouldStartZK = true;
        private Optional<ServerConfiguration> serverConf = Optional.absent();

        public Builder numBookies(int i) {
            this.numBookies = i;
            return this;
        }

        public Builder zkHost(String str) {
            this.zkHost = str;
            return this;
        }

        public Builder zkPort(int i) {
            this.zkPort = i;
            return this;
        }

        public Builder zkTimeoutSec(int i) {
            this.zkTimeoutSec = i;
            return this;
        }

        public Builder initialBookiePort(int i) {
            this.initialBookiePort = i;
            return this;
        }

        public Builder shouldStartZK(boolean z) {
            this.shouldStartZK = z;
            return this;
        }

        public Builder serverConf(ServerConfiguration serverConfiguration) {
            this.serverConf = Optional.of(serverConfiguration);
            return this;
        }

        public LocalDLMEmulator build() throws Exception {
            ServerConfiguration serverConfiguration;
            if (this.serverConf.isPresent()) {
                serverConfiguration = this.serverConf.get();
            } else {
                serverConfiguration = (ServerConfiguration) LocalDLMEmulator.DEFAULT_SERVER_CONFIGURATION.clone();
                serverConfiguration.setZkTimeout(this.zkTimeoutSec * 1000);
            }
            ServerConfiguration serverConfiguration2 = new ServerConfiguration();
            serverConfiguration2.loadConf(serverConfiguration);
            serverConfiguration2.setAllowLoopback(true);
            return new LocalDLMEmulator(this.numBookies, this.shouldStartZK, this.zkHost, this.zkPort, this.initialBookiePort, this.zkTimeoutSec, serverConfiguration2);
        }
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private LocalDLMEmulator(final int i, final boolean z, final String str, final int i2, final int i3, int i4, final ServerConfiguration serverConfiguration) throws Exception {
        this.tmpDirs = new ArrayList();
        this.numBookies = i;
        this.zkHost = str;
        this.zkPort = i2;
        this.zkEnsemble = str + ":" + i2;
        this.uri = URI.create("distributedlog://" + this.zkEnsemble + DLOG_NAMESPACE);
        this.zkTimeoutSec = i4;
        this.bkStartupThread = new Thread() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.LocalDLMEmulator.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    LocalDLMEmulator.LOG.info("Starting {} bookies : allowLoopback = {}", Integer.valueOf(i), Boolean.valueOf(serverConfiguration.getAllowLoopback()));
                    LocalBookKeeper.startLocalBookies(str, i2, i, z, i3, serverConfiguration);
                    LocalDLMEmulator.LOG.info("{} bookies are started.", Integer.valueOf(i));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    LocalDLMEmulator.LOG.error("Error starting local bk", (Throwable) e2);
                }
            }
        };
    }

    public void start() throws Exception {
        this.bkStartupThread.start();
        if (!LocalBookKeeper.waitForServerUp(this.zkEnsemble, this.zkTimeoutSec * 1000)) {
            throw new Exception("Error starting zookeeper/bookkeeper");
        }
        int checkBookiesUp = checkBookiesUp(this.numBookies, this.zkTimeoutSec);
        if (this.numBookies != checkBookiesUp) {
            LOG.info("Only {} bookies are up, expected {} bookies to be there.", Integer.valueOf(checkBookiesUp), Integer.valueOf(this.numBookies));
        }
        if (!$assertionsDisabled && this.numBookies != checkBookiesUp) {
            throw new AssertionError();
        }
        DLMetadata.create(new BKDLConfig(this.zkEnsemble, "/ledgers")).create(this.uri);
    }

    public void teardown() throws Exception {
        if (this.bkStartupThread != null) {
            this.bkStartupThread.interrupt();
            this.bkStartupThread.join();
        }
        Iterator<File> it = this.tmpDirs.iterator();
        while (it.hasNext()) {
            FileUtils.forceDeleteOnExit(it.next());
        }
    }

    public String getZkServers() {
        return this.zkEnsemble;
    }

    public URI getUri() {
        return this.uri;
    }

    public int checkBookiesUp(int i, int i2) throws Exception {
        ZooKeeper connectZooKeeper = connectZooKeeper(this.zkHost, this.zkPort, this.zkTimeoutSec);
        int i3 = 0;
        for (int i4 = 0; i4 < i2; i4++) {
            try {
                try {
                    List<String> children = connectZooKeeper.getChildren("/ledgers/available", false);
                    children.remove(BookKeeperConstants.READONLY);
                    i3 = children.size();
                    if (i3 > i || LOG.isDebugEnabled()) {
                        LOG.info("Found " + i3 + " bookies up, waiting for " + i);
                        if (i3 > i || LOG.isTraceEnabled()) {
                            Iterator<String> it = children.iterator();
                            while (it.hasNext()) {
                                LOG.info(" server: " + it.next());
                            }
                        }
                    }
                } finally {
                    connectZooKeeper.close();
                }
            } catch (KeeperException e) {
            }
            if (i3 == i) {
                break;
            }
            Thread.sleep(1000L);
        }
        return i3;
    }

    public static String getBkLedgerPath() {
        return "/ledgers";
    }

    public static ZooKeeper connectZooKeeper(String str, int i) throws IOException, KeeperException, InterruptedException {
        return connectZooKeeper(str, i, 10);
    }

    public static ZooKeeper connectZooKeeper(String str, int i, int i2) throws IOException, KeeperException, InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper(str + ":" + i, i2 * 1000, new Watcher() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.LocalDLMEmulator.2
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                    countDownLatch.countDown();
                }
            }
        });
        if (countDownLatch.await(i2, TimeUnit.SECONDS)) {
            return zooKeeper;
        }
        throw new IOException("Zookeeper took too long to connect");
    }

    public static URI createDLMURI(String str) throws Exception {
        return createDLMURI(DEFAULT_ZK_ENSEMBLE, str);
    }

    public static URI createDLMURI(String str, String str2) throws Exception {
        return URI.create("distributedlog://" + str + DLOG_NAMESPACE + str2);
    }

    public static Pair<ZooKeeperServerShim, Integer> runZookeeperOnAnyPort(File file) throws Exception {
        return runZookeeperOnAnyPort((int) ((Math.random() * 10000.0d) + 7000.0d), file);
    }

    public static Pair<ZooKeeperServerShim, Integer> runZookeeperOnAnyPort(int i, File file) throws Exception {
        ZooKeeperServerShim zooKeeperServerShim = null;
        int i2 = i;
        boolean z = false;
        int i3 = 0;
        while (!z) {
            try {
                LOG.info("zk trying to bind to port " + i2);
                zooKeeperServerShim = LocalBookKeeper.runZookeeper(1000, i2, file);
                z = true;
            } catch (BindException e) {
                i3++;
                if (i3 > 20) {
                    throw e;
                }
                i2++;
                if (i2 > 65535) {
                    i2 = 1025;
                }
            }
        }
        return Pair.of(zooKeeperServerShim, Integer.valueOf(i2));
    }

    public static void main(String[] strArr) throws Exception {
        int parseInt;
        try {
            if (strArr.length < 1) {
                System.out.println("Usage: LocalDLEmulator [<zk_host>] <zk_port>");
                System.exit(-1);
            }
            String str = DEFAULT_ZK_HOST;
            if (strArr.length == 1) {
                parseInt = Integer.parseInt(strArr[0]);
            } else {
                str = strArr[0];
                parseInt = Integer.parseInt(strArr[1]);
            }
            final File createTempDir = IOUtils.createTempDir("distrlog", ZKClientConfig.ZK_SASL_CLIENT_USERNAME_DEFAULT);
            LocalDLMEmulator build = newBuilder().zkHost(str).zkPort(parseInt).build();
            Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.LocalDLMEmulator.3
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        LocalDLMEmulator.this.teardown();
                        FileUtils.forceDeleteOnExit(createTempDir);
                        System.out.println("ByeBye!");
                    } catch (Exception e) {
                    }
                }
            });
            build.start();
            System.out.println(String.format("DistributedLog Sandbox is running now. You could access distributedlog://%s:%s", str, Integer.valueOf(parseInt)));
        } catch (Exception e) {
            System.out.println("Exception occurred running emulator " + e);
        }
    }

    static {
        $assertionsDisabled = !LocalDLMEmulator.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) LocalDLMEmulator.class);
        DEFAULT_SERVER_CONFIGURATION = new ServerConfiguration();
    }
}
