package org.apache.kafka.streams.integration.utils;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode$Enforced$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/apache/kafka/streams/integration/utils/KafkaEmbedded.class */
public class KafkaEmbedded {
    private static final Logger log = LoggerFactory.getLogger(KafkaEmbedded.class);
    private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181";
    private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10000;
    private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8000;
    private final Properties effectiveConfig;
    private final File logDir;
    public final TemporaryFolder tmpFolder = new TemporaryFolder();
    private final KafkaServer kafka;

    public KafkaEmbedded(Properties properties, MockTime mockTime) throws IOException {
        this.tmpFolder.create();
        this.logDir = this.tmpFolder.newFolder();
        this.effectiveConfig = effectiveConfigFrom(properties);
        KafkaConfig kafkaConfig = new KafkaConfig(this.effectiveConfig, true);
        log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", this.logDir, zookeeperConnect());
        this.kafka = TestUtils.createServer(kafkaConfig, mockTime);
        log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", brokerList(), zookeeperConnect());
    }

    private Properties effectiveConfigFrom(Properties properties) throws IOException {
        Properties properties2 = new Properties();
        properties2.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0);
        properties2.put(KafkaConfig$.MODULE$.HostNameProp(), "127.0.0.1");
        properties2.put(KafkaConfig$.MODULE$.PortProp(), "9092");
        properties2.put(KafkaConfig$.MODULE$.NumPartitionsProp(), 1);
        properties2.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
        properties2.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), 1000000);
        properties2.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true);
        properties2.putAll(properties);
        properties2.setProperty(KafkaConfig$.MODULE$.LogDirProp(), this.logDir.getAbsolutePath());
        return properties2;
    }

    public String brokerList() {
        return this.kafka.config().hostName() + ":" + this.kafka.boundPort(SecurityProtocol.PLAINTEXT);
    }

    public String zookeeperConnect() {
        return this.effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT);
    }

    public void stop() {
        log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...", brokerList(), zookeeperConnect());
        this.kafka.shutdown();
        this.kafka.awaitShutdown();
        log.debug("Removing logs.dir at {} ...", this.logDir);
        CoreUtils.delete(JavaConversions.asScalaBuffer(Collections.singletonList(this.logDir.getAbsolutePath())).seq());
        this.tmpFolder.delete();
        log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", brokerList(), zookeeperConnect());
    }

    public void createTopic(String str) {
        createTopic(str, 1, 1, new Properties());
    }

    public void createTopic(String str, int i, int i2) {
        createTopic(str, i, i2, new Properties());
    }

    public void createTopic(String str, int i, int i2, Properties properties) {
        log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2), properties});
        ZkClient zkClient = new ZkClient(zookeeperConnect(), DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$);
        AdminUtils.createTopic(new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), false), str, i, i2, properties, RackAwareMode$Enforced$.MODULE$);
        zkClient.close();
    }

    public void deleteTopic(String str) {
        log.debug("Deleting topic { name: {} }", str);
        ZkClient zkClient = new ZkClient(zookeeperConnect(), DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$);
        AdminUtils.deleteTopic(new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), false), str);
        zkClient.close();
    }
}
