package com.github.sakserv.minicluster.impl;

import com.github.sakserv.minicluster.MiniCluster;
import com.github.sakserv.minicluster.util.FileUtils;
import java.io.File;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;

/* loaded from: input_file:com/github/sakserv/minicluster/impl/KafkaLocalBroker.class */
public class KafkaLocalBroker implements MiniCluster {
    public static final String DEFAULT_TEST_TOPIC = "test-topic";
    public static final String DEFAULT_LOG_DIR = "embedded_kafka";
    public static final int DEFAULT_PORT = 9092;
    public static final int DEFAULT_BROKER_ID = 1;
    public static final String DEFAULT_ZK_CONNECTION_STRING = "localhost:2181";
    public KafkaConfig conf;
    public KafkaServer server;
    private String topic;
    private String logDir;
    private int port;
    private int brokerId;
    private String zkConnString;

    /* loaded from: input_file:com/github/sakserv/minicluster/impl/KafkaLocalBroker$LocalSystemTime.class */
    public class LocalSystemTime implements Time {
        public LocalSystemTime() {
        }

        @Override // kafka.utils.Time
        public long milliseconds() {
            return System.currentTimeMillis();
        }

        @Override // kafka.utils.Time
        public long nanoseconds() {
            return System.nanoTime();
        }

        @Override // kafka.utils.Time
        public void sleep(long j) {
            try {
                Thread.sleep(j);
            } catch (InterruptedException e) {
            }
        }
    }

    public KafkaLocalBroker() {
        this(DEFAULT_TEST_TOPIC, DEFAULT_LOG_DIR, DEFAULT_PORT, 1, "localhost:2181");
    }

    public KafkaLocalBroker(String str) {
        this(str, DEFAULT_LOG_DIR, DEFAULT_PORT, 1, "localhost:2181");
    }

    public KafkaLocalBroker(String str, String str2, int i, int i2, String str3) {
        this.topic = str;
        this.logDir = str2;
        this.port = i;
        this.brokerId = i2;
        this.zkConnString = str3;
        configure();
    }

    @Override // com.github.sakserv.minicluster.MiniCluster
    public void configure() {
        configure(this.logDir, this.port, this.brokerId, this.zkConnString);
    }

    public void configure(String str, int i, int i2, String str2) {
        Properties properties = new Properties();
        properties.put("port", i + "");
        properties.put("broker.id", i2 + "");
        properties.put("log.dir", str);
        properties.put("enable.zookeeper", "true");
        properties.put("zookeeper.connect", str2);
        properties.put("advertised.host.name", "localhost");
        this.conf = new KafkaConfig(properties);
    }

    @Override // com.github.sakserv.minicluster.MiniCluster
    public void start() {
        this.server = new KafkaServer(this.conf, new LocalSystemTime());
        System.out.println("KAFKA: Starting Kafka on port: " + this.port);
        this.server.startup();
    }

    @Override // com.github.sakserv.minicluster.MiniCluster
    public void stop() {
        stop(false);
    }

    public void stop(boolean z) {
        System.out.println("KAFKA: Stopping Kafka on port: " + this.port);
        this.server.shutdown();
        if (z) {
            System.out.println("KAFKA: Deleting Old Topics");
            deleteOldTopics(this.logDir);
        }
    }

    @Override // com.github.sakserv.minicluster.MiniCluster
    public void dumpConfig() {
        System.out.println("KAFKA CONFIG: " + this.conf.props().toString());
    }

    public int getPort() {
        return this.port;
    }

    public void deleteOldTopics(String str) {
        File file = new File(str);
        if (file.exists()) {
            FileUtils.deleteFolder(file.getAbsolutePath());
        }
    }
}
