package kafka.testkit;

import java.io.File;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import kafka.raft.KafkaRaftManager;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.FaultHandlerFactory;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import kafka.server.SharedServer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:kafka/testkit/KafkaClusterTestKit.class */
public class KafkaClusterTestKit implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
    private final ExecutorService executorService;
    private final TestKitNodes nodes;
    private final Map<Integer, ControllerServer> controllers;
    private final Map<Integer, BrokerServer> brokers;
    private final ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager;
    private final File baseDirectory;
    private final SimpleFaultHandlerFactory faultHandlerFactory;

    /* loaded from: input_file:kafka/testkit/KafkaClusterTestKit$Builder.class */
    public static class Builder {
        private TestKitNodes nodes;
        private final Map<String, Object> configProps = new HashMap();
        private final SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory();

        public Builder(TestKitNodes testKitNodes) {
            this.nodes = testKitNodes;
        }

        public Builder setConfigProp(String str, Object obj) {
            this.configProps.put(str, obj);
            return this;
        }

        private KafkaConfig createNodeConfig(TestKitNode testKitNode) {
            BrokerNode brokerNode = this.nodes.brokerNodes().get(Integer.valueOf(testKitNode.id()));
            ControllerNode controllerNode = this.nodes.controllerNodes().get(Integer.valueOf(testKitNode.id()));
            HashMap hashMap = new HashMap(this.configProps);
            hashMap.put("server.max.startup.time.ms", Long.toString(TimeUnit.MINUTES.toMillis(10L)));
            hashMap.put("process.roles", roles(testKitNode.id()));
            hashMap.put("node.id", Integer.toString(testKitNode.id()));
            if (controllerNode != null) {
                hashMap.put("metadata.log.dir", controllerNode.metadataDirectory());
            } else {
                hashMap.put("metadata.log.dir", testKitNode.metadataDirectory());
            }
            if (brokerNode != null) {
                hashMap.put("log.dirs", String.join(",", brokerNode.logDataDirectories()));
            } else {
                hashMap.put("log.dirs", controllerNode.metadataDirectory());
            }
            hashMap.putIfAbsent("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
            hashMap.putIfAbsent("listeners", listeners(testKitNode.id()));
            hashMap.putIfAbsent("inter.broker.listener.name", this.nodes.interBrokerListenerName().value());
            hashMap.putIfAbsent("controller.listener.names", "CONTROLLER");
            hashMap.put("controller.quorum.voters", (String) this.nodes.controllerNodes().keySet().stream().map(num -> {
                return String.format("%d@0.0.0.0:0", num);
            }).collect(Collectors.joining(",")));
            hashMap.put("log.cleaner.dedupe.buffer.size", "2097152");
            if (brokerNode != null) {
                hashMap.putAll(brokerNode.propertyOverrides());
            }
            if (controllerNode != null) {
                hashMap.putAll(controllerNode.propertyOverrides());
            }
            hashMap.putIfAbsent("unstable.feature.versions.enable", "true");
            hashMap.putIfAbsent("unstable.api.versions.enable", "true");
            return new KafkaConfig(hashMap, false);
        }

        public KafkaClusterTestKit build() throws Exception {
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            HashMap hashMap3 = new HashMap();
            int size = (this.nodes.brokerNodes().size() + this.nodes.controllerNodes().size()) * 2;
            ExecutorService executorService = null;
            ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager = new ControllerQuorumVotersFutureManager(this.nodes.controllerNodes().size());
            File file = null;
            try {
                file = new File(this.nodes.baseDirectory());
                executorService = Executors.newFixedThreadPool(size, ThreadUtils.createThreadFactory("kafka-cluster-test-kit-executor-%d", false));
                for (ControllerNode controllerNode : this.nodes.controllerNodes().values()) {
                    setupNodeDirectories(file, controllerNode.metadataDirectory(), Collections.emptyList());
                    SharedServer sharedServer = new SharedServer(createNodeConfig(controllerNode), controllerNode.initialMetaPropertiesEnsemble(), Time.SYSTEM, new Metrics(), controllerQuorumVotersFutureManager.future, Collections.emptyList(), this.faultHandlerFactory);
                    try {
                        ControllerServer controllerServer = new ControllerServer(sharedServer, KafkaRaftServer.configSchema(), this.nodes.bootstrapMetadata());
                        hashMap.put(Integer.valueOf(controllerNode.id()), controllerServer);
                        controllerServer.socketServerFirstBoundPortFuture().whenComplete((num, th) -> {
                            if (th != null) {
                                controllerQuorumVotersFutureManager.fail(th);
                            } else {
                                controllerQuorumVotersFutureManager.registerPort(controllerNode.id(), num.intValue());
                            }
                        });
                        hashMap3.put(Integer.valueOf(controllerNode.id()), sharedServer);
                    } catch (Throwable th2) {
                        KafkaClusterTestKit.log.error("Error creating controller {}", Integer.valueOf(controllerNode.id()), th2);
                        Logger logger = KafkaClusterTestKit.log;
                        Level level = Level.WARN;
                        sharedServer.getClass();
                        Utils.swallow(logger, level, "sharedServer.stopForController error", sharedServer::stopForController);
                        throw th2;
                    }
                }
                for (BrokerNode brokerNode : this.nodes.brokerNodes().values()) {
                    SharedServer sharedServer2 = (SharedServer) hashMap3.computeIfAbsent(Integer.valueOf(brokerNode.id()), num2 -> {
                        return new SharedServer(createNodeConfig(brokerNode), brokerNode.initialMetaPropertiesEnsemble(), Time.SYSTEM, new Metrics(), controllerQuorumVotersFutureManager.future, Collections.emptyList(), this.faultHandlerFactory);
                    });
                    try {
                        hashMap2.put(Integer.valueOf(brokerNode.id()), new BrokerServer(sharedServer2));
                    } catch (Throwable th3) {
                        KafkaClusterTestKit.log.error("Error creating broker {}", Integer.valueOf(brokerNode.id()), th3);
                        Logger logger2 = KafkaClusterTestKit.log;
                        Level level2 = Level.WARN;
                        sharedServer2.getClass();
                        Utils.swallow(logger2, level2, "sharedServer.stopForBroker error", sharedServer2::stopForBroker);
                        throw th3;
                    }
                }
                return new KafkaClusterTestKit(executorService, this.nodes, hashMap, hashMap2, controllerQuorumVotersFutureManager, file, this.faultHandlerFactory);
            } catch (Exception e) {
                if (executorService != null) {
                    ThreadUtils.shutdownExecutorServiceQuietly(executorService, 5L, TimeUnit.MINUTES);
                }
                Iterator it = hashMap2.values().iterator();
                while (it.hasNext()) {
                    ((BrokerServer) it.next()).shutdown();
                }
                Iterator it2 = hashMap.values().iterator();
                while (it2.hasNext()) {
                    ((ControllerServer) it2.next()).shutdown();
                }
                controllerQuorumVotersFutureManager.close();
                if (file != null) {
                    Utils.delete(file);
                }
                throw e;
            }
        }

        private String listeners(int i) {
            return this.nodes.isCombined(i) ? "EXTERNAL://localhost:0,CONTROLLER://localhost:0" : this.nodes.controllerNodes().containsKey(Integer.valueOf(i)) ? "CONTROLLER://localhost:0" : "EXTERNAL://localhost:0";
        }

        private String roles(int i) {
            return this.nodes.isCombined(i) ? "broker,controller" : this.nodes.controllerNodes().containsKey(Integer.valueOf(i)) ? "controller" : "broker";
        }

        private static void setupNodeDirectories(File file, String str, Collection<String> collection) throws Exception {
            Files.createDirectories(new File(file, "local").toPath(), new FileAttribute[0]);
            Files.createDirectories(Paths.get(str, new String[0]), new FileAttribute[0]);
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                Files.createDirectories(Paths.get(it.next(), new String[0]), new FileAttribute[0]);
            }
        }
    }

    /* loaded from: input_file:kafka/testkit/KafkaClusterTestKit$ClientPropertiesBuilder.class */
    public class ClientPropertiesBuilder {
        private Properties properties;
        private boolean usingBootstrapControllers;

        public ClientPropertiesBuilder() {
            this.usingBootstrapControllers = false;
            this.properties = new Properties();
        }

        public ClientPropertiesBuilder(Properties properties) {
            this.usingBootstrapControllers = false;
            this.properties = properties;
        }

        public ClientPropertiesBuilder setUsingBootstrapControllers(boolean z) {
            this.usingBootstrapControllers = z;
            return this;
        }

        public Properties build() {
            if (this.usingBootstrapControllers) {
                this.properties.setProperty("bootstrap.controllers", KafkaClusterTestKit.this.bootstrapControllers());
                this.properties.remove("bootstrap.servers");
            } else {
                this.properties.setProperty("bootstrap.servers", KafkaClusterTestKit.this.bootstrapServers());
                this.properties.remove("bootstrap.controllers");
            }
            return this.properties;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kafka/testkit/KafkaClusterTestKit$ControllerQuorumVotersFutureManager.class */
    public static class ControllerQuorumVotersFutureManager implements AutoCloseable {
        private final int expectedControllers;
        private final CompletableFuture<Map<Integer, InetSocketAddress>> future = new CompletableFuture<>();
        private final Map<Integer, Integer> controllerPorts = new TreeMap();

        ControllerQuorumVotersFutureManager(int i) {
            this.expectedControllers = i;
        }

        /* JADX WARN: Multi-variable type inference failed */
        synchronized void registerPort(int i, int i2) {
            this.controllerPorts.put(Integer.valueOf(i), Integer.valueOf(i2));
            if (this.controllerPorts.size() >= this.expectedControllers) {
                this.future.complete(this.controllerPorts.entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry -> {
                    return new InetSocketAddress("localhost", ((Integer) entry.getValue()).intValue());
                })));
            }
        }

        void fail(Throwable th) {
            this.future.completeExceptionally(th);
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.future.cancel(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/testkit/KafkaClusterTestKit$SimpleFaultHandlerFactory.class */
    public static class SimpleFaultHandlerFactory implements FaultHandlerFactory {
        private final MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
        private final MockFaultHandler nonFatalFaultHandler = new MockFaultHandler("nonFatalFaultHandler");

        SimpleFaultHandlerFactory() {
        }

        MockFaultHandler fatalFaultHandler() {
            return this.fatalFaultHandler;
        }

        MockFaultHandler nonFatalFaultHandler() {
            return this.nonFatalFaultHandler;
        }

        public FaultHandler build(String str, boolean z, Runnable runnable) {
            return z ? this.fatalFaultHandler : this.nonFatalFaultHandler;
        }
    }

    private KafkaClusterTestKit(ExecutorService executorService, TestKitNodes testKitNodes, Map<Integer, ControllerServer> map, Map<Integer, BrokerServer> map2, ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager, File file, SimpleFaultHandlerFactory simpleFaultHandlerFactory) {
        this.executorService = executorService;
        this.nodes = testKitNodes;
        this.controllers = map;
        this.brokers = map2;
        this.controllerQuorumVotersFutureManager = controllerQuorumVotersFutureManager;
        this.baseDirectory = file;
        this.faultHandlerFactory = simpleFaultHandlerFactory;
    }

    public void format() throws Exception {
        ArrayList arrayList = new ArrayList();
        try {
            for (ControllerServer controllerServer : this.controllers.values()) {
                arrayList.add(this.executorService.submit(() -> {
                    formatNode(controllerServer.sharedServer().metaPropsEnsemble(), true);
                }));
            }
            for (Map.Entry<Integer, BrokerServer> entry : this.brokers.entrySet()) {
                BrokerServer value = entry.getValue();
                arrayList.add(this.executorService.submit(() -> {
                    formatNode(value.sharedServer().metaPropsEnsemble(), !nodes().brokerNodes().get(entry.getKey()).combined());
                }));
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
        } catch (Exception e) {
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((Future) it2.next()).cancel(true);
            }
            throw e;
        }
    }

    private void formatNode(MetaPropertiesEnsemble metaPropertiesEnsemble, boolean z) {
        try {
            MetaPropertiesEnsemble.Copier copier = new MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
            for (Map.Entry entry : metaPropertiesEnsemble.logDirProps().entrySet()) {
                String str = (String) entry.getKey();
                if (z || !metaPropertiesEnsemble.metadataLogDir().equals(Optional.of(str))) {
                    log.trace("Adding {} to the list of directories to format.", str);
                    copier.setLogDirProps(str, (MetaProperties) entry.getValue());
                }
            }
            copier.setPreWriteHandler((str2, z2, metaProperties) -> {
                log.info("Formatting {}.", str2);
                Files.createDirectories(Paths.get(str2, new String[0]), new FileAttribute[0]);
                new BootstrapDirectory(str2, Optional.empty()).writeBinaryFile(this.nodes.bootstrapMetadata());
            });
            copier.writeLogDirChanges();
        } catch (Exception e) {
            throw new RuntimeException("Failed to format node " + metaPropertiesEnsemble.nodeId(), e);
        }
    }

    public void startup() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        try {
            for (ControllerServer controllerServer : this.controllers.values()) {
                ExecutorService executorService = this.executorService;
                controllerServer.getClass();
                arrayList.add(executorService.submit(controllerServer::startup));
            }
            for (BrokerServer brokerServer : this.brokers.values()) {
                ExecutorService executorService2 = this.executorService;
                brokerServer.getClass();
                arrayList.add(executorService2.submit(brokerServer::startup));
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
        } catch (Exception e) {
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((Future) it2.next()).cancel(true);
            }
            throw e;
        }
    }

    public void waitForReadyBrokers() throws ExecutionException, InterruptedException {
        this.controllers.values().iterator().next().controller().waitForReadyBrokers(this.brokers.size()).get();
        TestUtils.waitForCondition(() -> {
            return brokers().values().stream().allMatch(brokerServer -> {
                return brokerServer.metadataCache().getAliveBrokers().size() == this.brokers.size();
            });
        }, "Failed to wait for publisher to publish the metadata update to each broker.");
    }

    public String quorumVotersConfig() throws ExecutionException, InterruptedException {
        List<Node> voterConnectionsToNodes = QuorumConfig.voterConnectionsToNodes((Map) this.controllerQuorumVotersFutureManager.future.get());
        StringBuilder sb = new StringBuilder();
        String str = "";
        for (Node node : voterConnectionsToNodes) {
            sb.append(str).append(node.id()).append('@');
            sb.append(node.host()).append(":").append(node.port());
            str = ",";
        }
        return sb.toString();
    }

    public ClientPropertiesBuilder newClientPropertiesBuilder(Properties properties) {
        return new ClientPropertiesBuilder(properties);
    }

    public ClientPropertiesBuilder newClientPropertiesBuilder() {
        return new ClientPropertiesBuilder();
    }

    public Properties clientProperties() {
        return new ClientPropertiesBuilder().build();
    }

    public String bootstrapServers() {
        StringBuilder sb = new StringBuilder();
        String str = "";
        for (Map.Entry<Integer, BrokerServer> entry : this.brokers.entrySet()) {
            int intValue = entry.getKey().intValue();
            BrokerServer value = entry.getValue();
            ListenerName externalListenerName = this.nodes.externalListenerName();
            int boundPort = value.boundPort(externalListenerName);
            if (boundPort <= 0) {
                throw new RuntimeException("Broker " + intValue + " does not yet have a bound port for " + externalListenerName + ".  Did you start the cluster yet?");
            }
            sb.append(str).append("localhost:").append(boundPort);
            str = ",";
        }
        return sb.toString();
    }

    public String bootstrapControllers() {
        StringBuilder sb = new StringBuilder();
        String str = "";
        for (Map.Entry<Integer, ControllerServer> entry : this.controllers.entrySet()) {
            int intValue = entry.getKey().intValue();
            ControllerServer value = entry.getValue();
            ListenerName controllerListenerName = this.nodes.controllerListenerName();
            int boundPort = value.socketServer().boundPort(controllerListenerName);
            if (boundPort <= 0) {
                throw new RuntimeException("Controller " + intValue + " does not yet have a bound port for " + controllerListenerName + ".  Did you start the cluster yet?");
            }
            sb.append(str).append("localhost:").append(boundPort);
            str = ",";
        }
        return sb.toString();
    }

    public Map<Integer, ControllerServer> controllers() {
        return this.controllers;
    }

    public Controller waitForActiveController() throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference(null);
        TestUtils.retryOnExceptionWithTimeout(60000L, () -> {
            for (ControllerServer controllerServer : this.controllers.values()) {
                if (controllerServer.controller().isActive()) {
                    atomicReference.set(controllerServer.controller());
                }
            }
            Assertions.assertNotNull(atomicReference.get(), "No active controller found");
        });
        return (Controller) atomicReference.get();
    }

    public Map<Integer, BrokerServer> brokers() {
        return this.brokers;
    }

    public Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers() {
        HashMap hashMap = new HashMap();
        for (BrokerServer brokerServer : brokers().values()) {
            hashMap.put(Integer.valueOf(brokerServer.config().brokerId()), brokerServer.sharedServer().raftManager());
        }
        for (ControllerServer controllerServer : controllers().values()) {
            if (!hashMap.containsKey(Integer.valueOf(controllerServer.config().nodeId()))) {
                hashMap.put(Integer.valueOf(controllerServer.config().nodeId()), controllerServer.sharedServer().raftManager());
            }
        }
        return hashMap;
    }

    public TestKitNodes nodes() {
        return this.nodes;
    }

    public MockFaultHandler fatalFaultHandler() {
        return this.faultHandlerFactory.fatalFaultHandler();
    }

    public MockFaultHandler nonFatalFaultHandler() {
        return this.faultHandlerFactory.nonFatalFaultHandler();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        ArrayList arrayList = new ArrayList();
        try {
            try {
                this.controllerQuorumVotersFutureManager.close();
                for (Map.Entry<Integer, BrokerServer> entry : this.brokers.entrySet()) {
                    int intValue = entry.getKey().intValue();
                    BrokerServer value = entry.getValue();
                    ExecutorService executorService = this.executorService;
                    value.getClass();
                    arrayList.add(new AbstractMap.SimpleImmutableEntry("broker" + intValue, executorService.submit(value::shutdown)));
                }
                waitForAllFutures(arrayList);
                arrayList.clear();
                for (Map.Entry<Integer, ControllerServer> entry2 : this.controllers.entrySet()) {
                    int intValue2 = entry2.getKey().intValue();
                    ControllerServer value2 = entry2.getValue();
                    ExecutorService executorService2 = this.executorService;
                    value2.getClass();
                    arrayList.add(new AbstractMap.SimpleImmutableEntry("controller" + intValue2, executorService2.submit(value2::shutdown)));
                }
                waitForAllFutures(arrayList);
                arrayList.clear();
                Utils.delete(this.baseDirectory);
                ThreadUtils.shutdownExecutorServiceQuietly(this.executorService, 5L, TimeUnit.MINUTES);
                this.faultHandlerFactory.fatalFaultHandler().maybeRethrowFirstException();
                this.faultHandlerFactory.nonFatalFaultHandler().maybeRethrowFirstException();
            } catch (Exception e) {
                Iterator<Map.Entry<String, Future<?>>> it = arrayList.iterator();
                while (it.hasNext()) {
                    it.next().getValue().cancel(true);
                }
                throw e;
            }
        } catch (Throwable th) {
            ThreadUtils.shutdownExecutorServiceQuietly(this.executorService, 5L, TimeUnit.MINUTES);
            throw th;
        }
    }

    private void waitForAllFutures(List<Map.Entry<String, Future<?>>> list) throws Exception {
        for (Map.Entry<String, Future<?>> entry : list) {
            log.debug("waiting for {} to shut down.", entry.getKey());
            entry.getValue().get();
            log.debug("{} successfully shut down.", entry.getKey());
        }
    }
}
