package com.github.congnt24;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/congnt24/KafkaStandalone.class */
public class KafkaStandalone {
    private static final Logger logger = LoggerFactory.getLogger(KafkaStandalone.class);
    private static final KafkaStandalone instance = new KafkaStandalone();
    private KafkaLocal kafkaServer;
    private String kafkaServerUrl;
    private String zkServerUrl;
    private int kafkaLocalPort;
    private int zkLocalPort;
    private AdminClient adminClient;

    public static KafkaStandalone getInstance() {
        return instance;
    }

    public KafkaStandalone() {
        System.setProperty("log4j.configurationFile", getClass().getClassLoader().getResource("log4j2.xml").toString());
    }

    private boolean startEmbeddedKafkaServer() {
        Properties properties = new Properties();
        Properties properties2 = new Properties();
        logger.info("Starting kafka server.");
        try {
            properties2.load(getClass().getClassLoader().getResourceAsStream("zookeeper.properties"));
            properties2.setProperty("clientPort", Integer.toString(this.zkLocalPort));
            new ZooKeeperLocal(properties2);
            logger.info("ZooKeeper instance is successfully started on port " + this.zkLocalPort);
            properties.load(getClass().getClassLoader().getResourceAsStream("kafka-server.properties"));
            properties.setProperty("zookeeper.connect", getZkUrl());
            properties.setProperty("port", Integer.toString(this.kafkaLocalPort));
            this.kafkaServer = new KafkaLocal(properties);
            this.kafkaServer.start();
            logger.info("kafka Server is successfully started on port " + this.kafkaLocalPort);
            return true;
        } catch (Exception e) {
            logger.error("Error starting the kafka Server.", e);
            return false;
        }
    }

    private AdminClient getAdminClient() {
        if (this.adminClient == null) {
            this.adminClient = AdminClient.create(createAdminClientProperties());
        }
        return this.adminClient;
    }

    private Properties createAdminClientProperties() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", getKafkaServerUrl());
        properties.put("group.id", "group_1");
        return properties;
    }

    public void createTopics(List<String> list, int i) {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new NewTopic(it.next(), i, (short) 1));
        }
        getAdminClient().createTopics(arrayList);
        try {
            getAdminClient().describeTopics(list).all().get(10L, TimeUnit.SECONDS);
        } catch (Exception e) {
            throw new RuntimeException("Error getting topic info", e);
        }
    }

    public void deleteTopic(String str) {
        getAdminClient().deleteTopics(Collections.singletonList(str));
    }

    public void prepare(int i, int i2) {
        this.kafkaLocalPort = i;
        this.zkLocalPort = i2;
        try {
            String hostName = InetAddress.getLocalHost().getHostName();
            this.kafkaServerUrl = hostName + ":" + i;
            this.zkServerUrl = hostName + ":" + i2;
            if (!startEmbeddedKafkaServer()) {
                throw new RuntimeException("Error starting the server!");
            }
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
            }
            logger.info("Completed the prepare phase.");
        } catch (Exception e2) {
            logger.error("Unexpected error", e2);
            throw new RuntimeException("Unexpected error", e2);
        }
    }

    public void tearDown() {
        if (this.adminClient != null) {
            this.adminClient.close();
            this.adminClient = null;
        }
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
        }
        if (this.kafkaServer != null) {
            logger.info("Shutting down the kafka Server.");
            this.kafkaServer.stop();
        }
        logger.info("Completed the tearDown phase.");
    }

    public String getZkUrl() {
        return this.zkServerUrl;
    }

    public String getKafkaServerUrl() {
        return this.kafkaServerUrl;
    }
}
