package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.class */
public class KTableSourceTopicRestartIntegrationTest {
    private static final String SOURCE_TOPIC = "source-topic";
    private String sourceTopic;
    private KafkaStreams streams;
    private Map<String, String> expectedInitialResultsMap;
    private Map<String, String> expectedResultsWithDataWrittenDuringRestoreMap;
    private static final Properties PRODUCER_CONFIG = new Properties();
    private static final Properties STREAMS_CONFIG = new Properties();
    private static final int NUM_BROKERS = 3;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private final Time time = CLUSTER.time;
    private final StreamsBuilder streamsBuilder = new StreamsBuilder();
    private final Map<String, String> readKeyValues = new ConcurrentHashMap();

    @Rule
    public TestName testName = new TestName();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest$UpdatingSourceTopicOnRestoreStartStateRestoreListener.class */
    public class UpdatingSourceTopicOnRestoreStartStateRestoreListener implements StateRestoreListener {
        private UpdatingSourceTopicOnRestoreStartStateRestoreListener() {
        }

        public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
            KTableSourceTopicRestartIntegrationTest.this.produceKeyValues("d");
        }

        public void onBatchRestored(TopicPartition topicPartition, String str, long j, long j2) {
        }

        public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
        }
    }

    @BeforeClass
    public static void setUpBeforeAllTests() {
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("default.key.serde", Serdes.String().getClass().getName());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass().getName());
        STREAMS_CONFIG.put("state.dir", TestUtils.tempDirectory().getPath());
        STREAMS_CONFIG.put("cache.max.bytes.buffering", 0);
        STREAMS_CONFIG.put("commit.interval.ms", 5);
        STREAMS_CONFIG.put("default.timestamp.extractor", WallclockTimestampExtractor.class);
        STREAMS_CONFIG.put("session.timeout.ms", 1000);
        STREAMS_CONFIG.put("heartbeat.interval.ms", 300);
        PRODUCER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG.put("acks", "all");
        PRODUCER_CONFIG.put("key.serializer", StringSerializer.class);
        PRODUCER_CONFIG.put("value.serializer", StringSerializer.class);
    }

    @Before
    public void before() throws Exception {
        this.sourceTopic = "source-topic-" + this.testName.getMethodName();
        CLUSTER.createTopic(this.sourceTopic);
        STREAMS_CONFIG.put("application.id", IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName));
        KStream stream = this.streamsBuilder.table(this.sourceTopic, Materialized.as("store")).toStream();
        Map<String, String> map = this.readKeyValues;
        map.getClass();
        stream.foreach((v1, v2) -> {
            r1.put(v1, v2);
        });
        this.expectedInitialResultsMap = createExpectedResultsMap("a", "b", "c");
        this.expectedResultsWithDataWrittenDuringRestoreMap = createExpectedResultsMap("a", "b", "c", "d", "f", "g", "h");
    }

    @After
    public void after() throws Exception {
        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
    }

    @Test
    public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() throws Exception {
        try {
            this.streams = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streams.start();
            produceKeyValues("a", "b", "c");
            assertNumberValuesRead(this.readKeyValues, this.expectedInitialResultsMap, "Table did not read all values");
            this.streams.close();
            this.streams = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streams.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
            this.streams.start();
            produceKeyValues("f", "g", "h");
            assertNumberValuesRead(this.readKeyValues, this.expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
        } finally {
            this.streams.close(Duration.ofSeconds(5L));
        }
    }

    @Test
    public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled() throws Exception {
        STREAMS_CONFIG.put("processing.guarantee", "exactly_once");
        shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled();
    }

    @Test
    public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosBetaEnabled() throws Exception {
        STREAMS_CONFIG.put("processing.guarantee", "exactly_once_beta");
        shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled();
    }

    private void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception {
        try {
            this.streams = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streams.start();
            produceKeyValues("a", "b", "c");
            assertNumberValuesRead(this.readKeyValues, this.expectedInitialResultsMap, "Table did not read all values");
            this.streams.close();
            this.streams = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streams.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
            this.streams.start();
            produceKeyValues("f", "g", "h");
            assertNumberValuesRead(this.readKeyValues, this.expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
        } finally {
            this.streams.close(Duration.ofSeconds(5L));
        }
    }

    @Test
    public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws Exception {
        try {
            this.streams = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streams.start();
            produceKeyValues("a", "b", "c");
            assertNumberValuesRead(this.readKeyValues, this.expectedInitialResultsMap, "Table did not read all values");
            this.streams.close();
            this.streams = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streams.start();
            produceKeyValues("f", "g", "h");
            assertNumberValuesRead(this.readKeyValues, createExpectedResultsMap("a", "b", "c", "f", "g", "h"), "Table did not get all values after restart");
        } finally {
            this.streams.close(Duration.ofSeconds(5L));
        }
    }

    private void assertNumberValuesRead(Map<String, String> map, Map<String, String> map2, String str) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return map.equals(map2);
        }, 30000L, str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void produceKeyValues(String... strArr) {
        ArrayList arrayList = new ArrayList();
        for (String str : strArr) {
            arrayList.add(new KeyValue(str, str + "1"));
        }
        IntegrationTestUtils.produceKeyValuesSynchronously(this.sourceTopic, arrayList, PRODUCER_CONFIG, this.time);
    }

    private Map<String, String> createExpectedResultsMap(String... strArr) {
        HashMap hashMap = new HashMap();
        for (String str : strArr) {
            hashMap.put(str, str + "1");
        }
        return hashMap;
    }
}
