package org.springframework.kafka.test.rule;

import java.lang.reflect.InvocationTargetException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.net.ServerSocketFactory;
import kafka.admin.AdminUtils;
import kafka.admin.AdminUtils$;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.NotRunning;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Time;
import org.assertj.core.api.Assertions;
import org.junit.rules.ExternalResource;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import scala.Option;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/springframework/kafka/test/rule/KafkaEmbedded.class */
public class KafkaEmbedded extends ExternalResource implements KafkaRule, InitializingBean, DisposableBean {
    public static final String BEAN_NAME = "kafkaEmbedded";
    public static final String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";
    public static final long METADATA_PROPAGATION_TIMEOUT = 10000;
    private final String clientVersion;
    private final int count;
    private final boolean controlledShutdown;
    private final String[] topics;
    private final int partitionsPerTopic;
    private final List<KafkaServer> kafkaServers;
    private EmbeddedZookeeper zookeeper;
    private ZkClient zookeeperClient;
    private String zkConnect;
    private Map<String, String> brokerProperties;
    private int[] kafkaPorts;

    public KafkaEmbedded(int i) {
        this(i, false, new String[0]);
    }

    public KafkaEmbedded(int i, boolean z, String... strArr) {
        this(i, z, 2, strArr);
    }

    public KafkaEmbedded(int i, boolean z, int i2, String... strArr) {
        this.kafkaServers = new ArrayList();
        this.clientVersion = AppInfoParser.getVersion();
        this.count = i;
        this.controlledShutdown = z;
        if (strArr != null) {
            this.topics = strArr;
        } else {
            this.topics = new String[0];
        }
        this.partitionsPerTopic = i2;
    }

    public KafkaEmbedded brokerProperties(Map<String, String> map) {
        this.brokerProperties = map;
        return this;
    }

    public void setKafkaPorts(int... iArr) {
        this.kafkaPorts = iArr;
    }

    public void afterPropertiesSet() throws Exception {
        before();
    }

    public void before() throws Exception {
        startZookeeper();
        this.zkConnect = "127.0.0.1:" + this.zookeeper.port();
        this.zookeeperClient = new ZkClient(this.zkConnect, 6000, 6000, ZKStringSerializer$.MODULE$);
        this.kafkaServers.clear();
        int i = 0;
        while (i < this.count) {
            Integer valueOf = (this.kafkaPorts == null || this.kafkaPorts.length <= i) ? null : Integer.valueOf(this.kafkaPorts[i]);
            if (valueOf == null) {
                ServerSocket createServerSocket = ServerSocketFactory.getDefault().createServerSocket(0);
                valueOf = Integer.valueOf(createServerSocket.getLocalPort());
                createServerSocket.close();
            }
            Properties createProperties = createProperties(i, valueOf);
            createProperties.setProperty(KafkaConfig$.MODULE$.PortProp(), "" + valueOf);
            createProperties.setProperty("replica.socket.timeout.ms", "1000");
            createProperties.setProperty("controller.socket.timeout.ms", "1000");
            createProperties.setProperty("offsets.topic.replication.factor", "1");
            if (this.brokerProperties != null) {
                for (Map.Entry<String, String> entry : this.brokerProperties.entrySet()) {
                    createProperties.setProperty(entry.getKey(), entry.getValue());
                }
            }
            this.kafkaServers.add(TestUtils.createServer(new KafkaConfig(createProperties), Time.SYSTEM));
            i++;
        }
        ZkUtils zkUtils = new ZkUtils(getZkClient(), (ZkConnection) null, false);
        Properties properties = new Properties();
        for (String str : this.topics) {
            AdminUtils.createTopic(zkUtils, str, this.partitionsPerTopic, this.count, properties, (RackAwareMode) null);
        }
        System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
    }

    public Properties createProperties(int i, Integer num) {
        if (this.clientVersion.startsWith("0.11")) {
            return TestUtils.createBrokerConfig(i, this.zkConnect, this.controlledShutdown, true, num.intValue(), Option.apply((Object) null), Option.apply((Object) null), Option.apply((Object) null), true, false, 0, false, 0, false, 0, Option.apply((Object) null));
        }
        try {
            return (Properties) TestUtils.class.getDeclaredMethod("createBrokerConfig", Integer.TYPE, String.class, Boolean.TYPE, Boolean.TYPE, Integer.TYPE, Option.class, Option.class, Option.class, Boolean.TYPE, Boolean.TYPE, Integer.TYPE, Boolean.TYPE, Integer.TYPE, Boolean.TYPE, Integer.TYPE, Option.class, Integer.TYPE).invoke(null, Integer.valueOf(i), this.zkConnect, Boolean.valueOf(this.controlledShutdown), true, num, Option.apply((Object) null), Option.apply((Object) null), Option.apply((Object) null), true, false, 0, false, 0, false, 0, Option.apply((Object) null), 1);
        } catch (IllegalAccessException | IllegalArgumentException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException(e);
        }
    }

    public void destroy() throws Exception {
        after();
    }

    public void after() {
        System.getProperties().remove(SPRING_EMBEDDED_KAFKA_BROKERS);
        for (KafkaServer kafkaServer : this.kafkaServers) {
            try {
                if (kafkaServer.brokerState().currentState() != NotRunning.state()) {
                    kafkaServer.shutdown();
                    kafkaServer.awaitShutdown();
                }
            } catch (Exception e) {
            }
            try {
                CoreUtils.delete(kafkaServer.config().logDirs());
            } catch (Exception e2) {
            }
        }
        try {
            this.zookeeperClient.close();
        } catch (ZkInterruptedException e3) {
        }
        try {
            this.zookeeper.shutdown();
        } catch (Exception e4) {
        }
    }

    @Override // org.springframework.kafka.test.rule.KafkaRule
    public List<KafkaServer> getKafkaServers() {
        return this.kafkaServers;
    }

    public KafkaServer getKafkaServer(int i) {
        return this.kafkaServers.get(i);
    }

    public EmbeddedZookeeper getZookeeper() {
        return this.zookeeper;
    }

    @Override // org.springframework.kafka.test.rule.KafkaRule
    public ZkClient getZkClient() {
        return this.zookeeperClient;
    }

    @Override // org.springframework.kafka.test.rule.KafkaRule
    public String getZookeeperConnectionString() {
        return this.zkConnect;
    }

    public BrokerAddress getBrokerAddress(int i) {
        return new BrokerAddress("127.0.0.1", this.kafkaServers.get(i).config().port().intValue());
    }

    @Override // org.springframework.kafka.test.rule.KafkaRule
    public BrokerAddress[] getBrokerAddresses() {
        ArrayList arrayList = new ArrayList();
        Iterator<KafkaServer> it = this.kafkaServers.iterator();
        while (it.hasNext()) {
            arrayList.add(new BrokerAddress("127.0.0.1", it.next().config().port().intValue()));
        }
        return (BrokerAddress[]) arrayList.toArray(new BrokerAddress[arrayList.size()]);
    }

    @Override // org.springframework.kafka.test.rule.KafkaRule
    public int getPartitionsPerTopic() {
        return this.partitionsPerTopic;
    }

    public void bounce(BrokerAddress brokerAddress) {
        for (KafkaServer kafkaServer : getKafkaServers()) {
            if (brokerAddress.equals(new BrokerAddress(kafkaServer.config().hostName(), kafkaServer.config().port().intValue()))) {
                kafkaServer.shutdown();
                kafkaServer.awaitShutdown();
            }
        }
    }

    public void startZookeeper() {
        this.zookeeper = new EmbeddedZookeeper();
    }

    public void bounce(int i, boolean z) {
        if (!this.clientVersion.startsWith("0.11")) {
            throw new UnsupportedOperationException("Not supported on clients greater than 0.11");
        }
        this.kafkaServers.get(i).shutdown();
        if (z) {
            long currentTimeMillis = System.currentTimeMillis();
            do {
                try {
                    Thread.sleep(100L);
                    boolean z2 = true;
                    ZkUtils zkUtils = new ZkUtils(getZkClient(), (ZkConnection) null, false);
                    for (MetadataResponse.TopicMetadata topicMetadata : JavaConversions.asJavaCollection(AdminUtils$.MODULE$.fetchTopicMetadataFromZk(AdminUtils$.MODULE$.fetchAllTopicConfigs(zkUtils).keySet(), zkUtils))) {
                        if (Errors.forCode(topicMetadata.error().code()).exception() == null) {
                            Iterator it = topicMetadata.partitionMetadata().iterator();
                            while (it.hasNext()) {
                                Iterator it2 = ((MetadataResponse.PartitionMetadata) it.next()).isr().iterator();
                                while (it2.hasNext()) {
                                    if (((Node) it2.next()).id() == i) {
                                        z2 = false;
                                    }
                                }
                            }
                        }
                    }
                    if (z2) {
                        return;
                    }
                } catch (InterruptedException e) {
                    return;
                }
            } while (System.currentTimeMillis() - currentTimeMillis < METADATA_PROPAGATION_TIMEOUT);
        }
    }

    public void bounce(int i) {
        bounce(i, true);
    }

    public void restart(final int i) throws Exception {
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(10, Collections.singletonMap(Exception.class, true));
        ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
        exponentialBackOffPolicy.setInitialInterval(100L);
        exponentialBackOffPolicy.setMaxInterval(1000L);
        exponentialBackOffPolicy.setMultiplier(2.0d);
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(simpleRetryPolicy);
        retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
        retryTemplate.execute(new RetryCallback<Void, Exception>() { // from class: org.springframework.kafka.test.rule.KafkaEmbedded.1
            /* renamed from: doWithRetry, reason: merged with bridge method [inline-methods] */
            public Void m0doWithRetry(RetryContext retryContext) throws Exception {
                ((KafkaServer) KafkaEmbedded.this.kafkaServers.get(i)).startup();
                return null;
            }
        });
    }

    public void waitUntilSynced(String str, int i) {
        if (!this.clientVersion.startsWith("0.11")) {
            throw new UnsupportedOperationException("Not supported on clients greater than 0.11");
        }
        long currentTimeMillis = System.currentTimeMillis();
        do {
            try {
                Thread.sleep(100L);
                boolean z = true;
                MetadataResponse.TopicMetadata fetchTopicMetadataFromZk = AdminUtils$.MODULE$.fetchTopicMetadataFromZk(str, new ZkUtils(getZkClient(), (ZkConnection) null, false));
                if (Errors.forCode(fetchTopicMetadataFromZk.error().code()).exception() == null) {
                    Iterator it = fetchTopicMetadataFromZk.partitionMetadata().iterator();
                    while (it.hasNext()) {
                        boolean z2 = false;
                        Iterator it2 = ((MetadataResponse.PartitionMetadata) it.next()).isr().iterator();
                        while (it2.hasNext()) {
                            if (((Node) it2.next()).id() == i) {
                                z2 = true;
                            }
                        }
                        if (!z2) {
                            z = false;
                        }
                    }
                }
                if (z) {
                    return;
                }
            } catch (InterruptedException e) {
                return;
            }
        } while (System.currentTimeMillis() - currentTimeMillis < METADATA_PROPAGATION_TIMEOUT);
    }

    @Override // org.springframework.kafka.test.rule.KafkaRule
    public String getBrokersAsString() {
        StringBuilder sb = new StringBuilder();
        for (BrokerAddress brokerAddress : getBrokerAddresses()) {
            sb.append(brokerAddress.toString()).append(',');
        }
        return sb.substring(0, sb.length() - 1);
    }

    @Override // org.springframework.kafka.test.rule.KafkaRule
    public boolean isEmbedded() {
        return true;
    }

    public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer) throws Exception {
        consumeFromEmbeddedTopics(consumer, this.topics);
    }

    public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String str) throws Exception {
        consumeFromEmbeddedTopics(consumer, str);
    }

    public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... strArr) throws Exception {
        for (String str : strArr) {
            Assertions.assertThat(this.topics).as("topic '" + str + "' is not in embedded topic list", new Object[0]).contains(new String[]{str});
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        consumer.subscribe(Arrays.asList(strArr), new ConsumerRebalanceListener() { // from class: org.springframework.kafka.test.rule.KafkaEmbedded.2
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                countDownLatch.countDown();
            }
        });
        consumer.poll(0L);
        Assertions.assertThat(countDownLatch.await(30L, TimeUnit.SECONDS)).as("Failed to be assigned partitions from the embedded topics", new Object[0]).isTrue();
    }
}
