package org.apache.kafka.streams.integration;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import kafka.admin.AdminClient;
import kafka.tools.StreamsResetter;
import kafka.utils.MockTime;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/apache/kafka/streams/integration/ResetIntegrationTest.class */
public class ResetIntegrationTest {
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private static final String APP_ID = "cleanup-integration-test";
    private static final String INPUT_TOPIC = "inputTopic";
    private static final String OUTPUT_TOPIC = "outputTopic";
    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
    private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
    private static final long STREAMS_CONSUMER_TIMEOUT = 2000;
    private static final long CLEANUP_CONSUMER_TIMEOUT = 2000;
    private final MockTime mockTime = CLUSTER.time;
    private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();
    private AdminClient adminClient = null;

    /* loaded from: input_file:org/apache/kafka/streams/integration/ResetIntegrationTest$WaitUntilConsumerGroupGotClosed.class */
    private class WaitUntilConsumerGroupGotClosed implements TestCondition {
        private WaitUntilConsumerGroupGotClosed() {
        }

        public boolean conditionMet() {
            return ResetIntegrationTest.this.adminClient.describeGroup(ResetIntegrationTest.APP_ID).members().isEmpty();
        }
    }

    @BeforeClass
    public static void startKafkaCluster() throws Exception {
        CLUSTER.createTopic(INPUT_TOPIC);
        CLUSTER.createTopic(OUTPUT_TOPIC);
        CLUSTER.createTopic(OUTPUT_TOPIC_2);
        CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN);
        CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
    }

    @Before
    public void prepare() {
        this.adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
    }

    @After
    public void cleanup() {
        if (this.adminClient != null) {
            this.adminClient.close();
            this.adminClient = null;
        }
    }

    @Test
    public void testReprocessingFromScratchAfterReset() throws Exception {
        Properties prepareTest = prepareTest();
        Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), "cleanup-integration-test-standard-consumer-outputTopic", LongDeserializer.class, LongDeserializer.class);
        prepareInputData();
        KafkaStreams kafkaStreams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2), prepareTest);
        kafkaStreams.start();
        List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_TOPIC, 10, 60000L);
        KeyValue keyValue = (KeyValue) IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_TOPIC_2, NUM_BROKERS).get(0);
        kafkaStreams.close();
        TestUtils.waitForCondition(this.consumerGroupInactive, 10000L, "Streams Application consumer group did not time out after 10000 ms.");
        KafkaStreams kafkaStreams2 = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), prepareTest);
        kafkaStreams2.cleanUp();
        cleanGlobal();
        TestUtils.waitForCondition(this.consumerGroupInactive, 10000L, "Reset Tool consumer group did not time out after 10000 ms.");
        assertInternalTopicsGotDeleted();
        kafkaStreams2.start();
        List waitUntilMinKeyValueRecordsReceived2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_TOPIC, 10);
        KeyValue keyValue2 = (KeyValue) IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_TOPIC_2_RERUN, NUM_BROKERS).get(0);
        kafkaStreams2.close();
        MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived2, CoreMatchers.equalTo(waitUntilMinKeyValueRecordsReceived));
        MatcherAssert.assertThat(keyValue2, CoreMatchers.equalTo(keyValue));
    }

    private Properties prepareTest() throws Exception {
        Properties properties = new Properties();
        properties.put("application.id", APP_ID);
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("zookeeper.connect", CLUSTER.zKConnectString());
        properties.put("state.dir", TestUtils.tempDirectory().getPath());
        properties.put("key.serde", Serdes.Long().getClass());
        properties.put("value.serde", Serdes.String().getClass());
        properties.put("num.stream.threads", 8);
        properties.put("commit.interval.ms", Integer.valueOf(NUM_BROKERS));
        properties.put("heartbeat.interval.ms", 100);
        properties.put("session.timeout.ms", "2000");
        properties.put("auto.offset.reset", "earliest");
        IntegrationTestUtils.purgeLocalStreamsState(properties);
        return properties;
    }

    private void prepareInputData() throws Exception {
        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class);
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(0L, "aaa")), producerConfig, Long.valueOf(this.mockTime.milliseconds()));
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(1L, "bbb")), producerConfig, Long.valueOf(this.mockTime.milliseconds()));
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(0L, "ccc")), producerConfig, Long.valueOf(this.mockTime.milliseconds()));
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(1L, "ddd")), producerConfig, Long.valueOf(this.mockTime.milliseconds()));
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(0L, "eee")), producerConfig, Long.valueOf(this.mockTime.milliseconds()));
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(1L, "fff")), producerConfig, Long.valueOf(this.mockTime.milliseconds()));
        this.mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(0L, "ggg")), producerConfig, Long.valueOf(this.mockTime.milliseconds()));
        this.mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(1L, "hhh")), producerConfig, Long.valueOf(this.mockTime.milliseconds()));
        this.mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(0L, "iii")), producerConfig, Long.valueOf(this.mockTime.milliseconds()));
        this.mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(1L, "jjj")), producerConfig, Long.valueOf(this.mockTime.milliseconds()));
    }

    private KStreamBuilder setupTopology(String str) {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KStream stream = kStreamBuilder.stream(new String[]{INPUT_TOPIC});
        stream.map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() { // from class: org.apache.kafka.streams.integration.ResetIntegrationTest.1
            public KeyValue<Long, String> apply(Long l, String str2) {
                return new KeyValue<>(l, str2);
            }
        }).groupByKey().count("global-count").to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
        stream.through(INTERMEDIATE_USER_TOPIC).map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() { // from class: org.apache.kafka.streams.integration.ResetIntegrationTest.3
            private long sleep = 1000;

            public KeyValue<Long, String> apply(Long l, String str2) {
                ResetIntegrationTest.this.mockTime.sleep(this.sleep);
                this.sleep *= 2;
                return new KeyValue<>(l, str2);
            }
        }).groupByKey().count(TimeWindows.of(35L).advanceBy(10L), "count").toStream().map(new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, Long>>() { // from class: org.apache.kafka.streams.integration.ResetIntegrationTest.2
            public KeyValue<Long, Long> apply(Windowed<Long> windowed, Long l) {
                return new KeyValue<>(Long.valueOf(windowed.window().start() + windowed.window().end()), l);
            }
        }).to(Serdes.Long(), Serdes.Long(), str);
        return kStreamBuilder;
    }

    private void cleanGlobal() {
        Properties properties = new Properties();
        properties.put("heartbeat.interval.ms", 100);
        properties.put("session.timeout.ms", "2000");
        Assert.assertEquals(0L, new StreamsResetter().run(new String[]{"--application-id", APP_ID, "--bootstrap-server", CLUSTER.bootstrapServers(), "--zookeeper", CLUSTER.zKConnectString(), "--input-topics", INPUT_TOPIC, "--intermediate-topics", INTERMEDIATE_USER_TOPIC}, properties));
    }

    private void assertInternalTopicsGotDeleted() {
        HashSet hashSet;
        HashSet hashSet2 = new HashSet();
        hashSet2.add(INPUT_TOPIC);
        hashSet2.add(INTERMEDIATE_USER_TOPIC);
        hashSet2.add(OUTPUT_TOPIC);
        hashSet2.add(OUTPUT_TOPIC_2);
        hashSet2.add(OUTPUT_TOPIC_2_RERUN);
        hashSet2.add("__consumer_offsets");
        ZkUtils zkUtils = null;
        try {
            zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(), 30000, 30000, JaasUtils.isZkSecurityEnabled());
            do {
                Utils.sleep(100L);
                hashSet = new HashSet();
                hashSet.addAll(JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
            } while (hashSet.size() != hashSet2.size());
            if (zkUtils != null) {
                zkUtils.close();
            }
            MatcherAssert.assertThat(hashSet, CoreMatchers.equalTo(hashSet2));
        } catch (Throwable th) {
            if (zkUtils != null) {
                zkUtils.close();
            }
            throw th;
        }
    }
}
