package org.apache.kafka.streams.integration;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/EosIntegrationTest.class */
public class EosIntegrationTest {
    private static final int MAX_POLL_INTERVAL_MS = 30000;
    private static final int MAX_WAIT_TIME_MS = 120000;
    private String applicationId;
    private static final int NUM_TOPIC_PARTITIONS = 2;
    private static final String CONSUMER_GROUP_ID = "readCommitted";
    private static final String SINGLE_PARTITION_INPUT_TOPIC = "singlePartitionInputTopic";
    private static final String SINGLE_PARTITION_THROUGH_TOPIC = "singlePartitionThroughTopic";
    private static final String SINGLE_PARTITION_OUTPUT_TOPIC = "singlePartitionOutputTopic";
    private static final String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic";
    private static final String MULTI_PARTITION_THROUGH_TOPIC = "multiPartitionThroughTopic";
    private static final String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic";
    private AtomicBoolean errorInjected;
    private AtomicBoolean stallInjected;
    private AtomicReference<String> stallingHost;
    private AtomicInteger commitRequested;
    private Throwable uncaughtException;
    private String stateTmpDir;

    @Parameterized.Parameter(0)
    public String eosConfig;

    @Parameterized.Parameter(1)
    public boolean processingThreadsEnabled;
    private static final Logger LOG = LoggerFactory.getLogger(EosIntegrationTest.class);
    private static final int NUM_BROKERS = 3;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "true")));
    private static final AtomicInteger TEST_NUMBER = new AtomicInteger(0);

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);
    private final String storeName = "store";
    private volatile boolean doStall = true;
    private volatile boolean hasUnexpectedError = false;

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Parameterized.Parameters(name = "{0}, processing threads = {1}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{"at_least_once", false}, new Object[]{"exactly_once", false}, new Object[]{"exactly_once_v2", false}, new Object[]{"at_least_once", true}, new Object[]{"exactly_once", true}, new Object[]{"exactly_once_v2", true});
    }

    @Before
    public void createTopics() throws Exception {
        this.applicationId = "appId-" + TEST_NUMBER.getAndIncrement();
        CLUSTER.deleteTopicsAndWait(IntegrationTestUtils.DEFAULT_TIMEOUT, SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
        CLUSTER.createTopics(SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
        CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
        CLUSTER.createTopic(MULTI_PARTITION_THROUGH_TOPIC, NUM_TOPIC_PARTITIONS, 1);
        CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
    }

    @Test
    public void shouldBeAbleToRunWithEosEnabled() throws Exception {
        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, this.eosConfig);
    }

    @Test
    public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Exception {
        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, true, this.eosConfig);
        Admin create = Admin.create(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers())}));
        Throwable th = null;
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers()), Utils.mkEntry("group.id", this.applicationId), Utils.mkEntry("key.deserializer", ByteArrayDeserializer.class), Utils.mkEntry("value.deserializer", ByteArrayDeserializer.class)}));
            Throwable th2 = null;
            try {
                try {
                    IntegrationTestUtils.waitForEmptyConsumerGroup(create, this.applicationId, 150000L);
                    TopicPartition topicPartition = new TopicPartition(SINGLE_PARTITION_INPUT_TOPIC, 0);
                    Set singleton = Collections.singleton(topicPartition);
                    long offset = ((OffsetAndMetadata) ((Map) create.listConsumerGroupOffsets(this.applicationId).partitionsToOffsetAndMetadata().get()).get(topicPartition)).offset();
                    kafkaConsumer.assign(singleton);
                    long position = kafkaConsumer.position(topicPartition);
                    long longValue = ((Long) kafkaConsumer.endOffsets(singleton).get(topicPartition)).longValue();
                    MatcherAssert.assertThat(Long.valueOf(offset), CoreMatchers.equalTo(Long.valueOf(position)));
                    MatcherAssert.assertThat(Long.valueOf(offset), CoreMatchers.equalTo(Long.valueOf(longValue)));
                    if (kafkaConsumer != null) {
                        if (0 != 0) {
                            try {
                                kafkaConsumer.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            kafkaConsumer.close();
                        }
                    }
                    if (create != null) {
                        if (0 == 0) {
                            create.close();
                            return;
                        }
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (kafkaConsumer != null) {
                    if (th2 != null) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    create.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void shouldBeAbleToRestartAfterClose() throws Exception {
        runSimpleCopyTest(NUM_TOPIC_PARTITIONS, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, this.eosConfig);
    }

    @Test
    public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false, this.eosConfig);
    }

    @Test
    public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception {
        runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, this.eosConfig);
    }

    @Test
    public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception {
        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false, this.eosConfig);
    }

    @Test
    public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws Exception {
        runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, this.eosConfig);
    }

    private void runSimpleCopyTest(int i, String str, String str2, String str3, boolean z, String str4) throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(str);
        KStream kStream = stream;
        if (str2 != null) {
            stream.to(str2);
            kStream = streamsBuilder.stream(str2);
        }
        kStream.to(str3);
        Properties properties = new Properties();
        properties.put("processing.guarantee", str4);
        properties.put("statestore.cache.max.bytes", 0);
        properties.put("commit.interval.ms", 100L);
        properties.put(StreamsConfig.consumerPrefix("max.poll.records"), 1);
        properties.put(StreamsConfig.consumerPrefix("metadata.max.age.ms"), "1000");
        properties.put(StreamsConfig.consumerPrefix("auto.offset.reset"), "earliest");
        properties.put(StreamsConfig.consumerPrefix("session.timeout.ms"), 29999);
        properties.put(StreamsConfig.consumerPrefix("max.poll.interval.ms"), Integer.valueOf(MAX_POLL_INTERVAL_MS));
        for (int i2 = 0; i2 < i; i2++) {
            Properties streamsConfig = StreamsTestUtils.getStreamsConfig(this.applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), properties);
            List<KeyValue<Long, Long>> prepareData = prepareData(i2 * 100, (i2 * 100) + 10, 0L, 1L);
            Properties properties2 = new Properties();
            if (z) {
                properties2.setProperty("transactional.id", this.applicationId + "-input-producer");
            }
            IntegrationTestUtils.produceKeyValuesSynchronously(str, (Collection) prepareData, TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, properties2), (Time) CLUSTER.time, z);
            KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfig);
            Throwable th = null;
            try {
                try {
                    IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
                    checkResultPerKey(readResult(str3, prepareData.size(), CONSUMER_GROUP_ID), prepareData, "The committed records do not match what expected");
                    if (kafkaStreams != null) {
                        if (0 != 0) {
                            try {
                                kafkaStreams.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            kafkaStreams.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (kafkaStreams != null) {
                        if (th != null) {
                            try {
                                kafkaStreams.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            kafkaStreams.close();
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        }
    }

    private void checkResultPerKey(List<KeyValue<Long, Long>> list, List<KeyValue<Long, Long>> list2, String str) {
        HashSet hashSet = new HashSet();
        addAllKeys(hashSet, list);
        addAllKeys(hashSet, list2);
        for (Long l : hashSet) {
            MatcherAssert.assertThat(str, getAllRecordPerKey(l, list), CoreMatchers.equalTo(getAllRecordPerKey(l, list2)));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void addAllKeys(Set<Long> set, List<KeyValue<Long, Long>> list) {
        Iterator<KeyValue<Long, Long>> it = list.iterator();
        while (it.hasNext()) {
            set.add(it.next().key);
        }
    }

    private List<KeyValue<Long, Long>> getAllRecordPerKey(Long l, List<KeyValue<Long, Long>> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (KeyValue<Long, Long> keyValue : list) {
            if (((Long) keyValue.key).equals(l)) {
                arrayList.add(keyValue);
            }
        }
        return arrayList;
    }

    @Test
    public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);
        Properties properties = new Properties();
        properties.put("processing.guarantee", this.eosConfig);
        properties.put("statestore.cache.max.bytes", 0);
        properties.put("commit.interval.ms", 100L);
        properties.put("metadata.max.age.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), StreamsTestUtils.getStreamsConfig(this.applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), properties));
        Throwable th = null;
        try {
            try {
                IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
                List<KeyValue<Long, Long>> prepareData = prepareData(0L, 5L, 0L);
                List<KeyValue<Long, Long>> prepareData2 = prepareData(5L, 8L, 0L);
                IntegrationTestUtils.produceKeyValuesSynchronously(SINGLE_PARTITION_INPUT_TOPIC, prepareData, TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), CLUSTER.time);
                MatcherAssert.assertThat(readResult(SINGLE_PARTITION_OUTPUT_TOPIC, prepareData.size(), CONSUMER_GROUP_ID), CoreMatchers.equalTo(prepareData));
                IntegrationTestUtils.produceKeyValuesSynchronously(SINGLE_PARTITION_INPUT_TOPIC, prepareData2, TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), CLUSTER.time);
                MatcherAssert.assertThat(readResult(SINGLE_PARTITION_OUTPUT_TOPIC, prepareData2.size(), CONSUMER_GROUP_ID), CoreMatchers.equalTo(prepareData2));
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldNotViolateEosIfOneTaskFails() throws Exception {
        if (this.eosConfig.equals("at_least_once")) {
            return;
        }
        KafkaStreams kafkaStreams = getKafkaStreams("dummy", false, "appDir", NUM_TOPIC_PARTITIONS, this.eosConfig);
        Throwable th = null;
        try {
            try {
                IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
                List<KeyValue<Long, Long>> prepareData = prepareData(0L, 10L, 0L, 1L);
                List<KeyValue<Long, Long>> prepareData2 = prepareData(10L, 15L, 0L, 1L);
                ArrayList arrayList = new ArrayList(prepareData.size() + prepareData2.size());
                arrayList.addAll(prepareData);
                arrayList.addAll(prepareData2);
                List<KeyValue<Long, Long>> prepareData3 = prepareData(15L, 20L, 0L, 1L);
                writeInputData(prepareData);
                TestUtils.waitForCondition(() -> {
                    return this.commitRequested.get() == NUM_TOPIC_PARTITIONS;
                }, 120000L, "StreamsTasks did not request commit.");
                checkResultPerKey(readResult(SINGLE_PARTITION_OUTPUT_TOPIC, prepareData.size(), CONSUMER_GROUP_ID), prepareData, "The committed records before failure do not match what expected");
                writeInputData(prepareData2);
                checkResultPerKey(readResult(SINGLE_PARTITION_OUTPUT_TOPIC, arrayList.size(), null), arrayList, "The uncommitted records before failure do not match what expected");
                this.errorInjected.set(true);
                writeInputData(prepareData3);
                TestUtils.waitForCondition(() -> {
                    return this.uncaughtException != null;
                }, 120000L, "Should receive uncaught exception from one StreamThread.");
                List<KeyValue<Long, Long>> readResult = readResult(SINGLE_PARTITION_OUTPUT_TOPIC, prepareData.size() + prepareData2.size() + prepareData3.size(), "readCommitted_ALL");
                List<KeyValue<Long, Long>> readResult2 = readResult(SINGLE_PARTITION_OUTPUT_TOPIC, prepareData2.size() + prepareData3.size(), CONSUMER_GROUP_ID);
                ArrayList arrayList2 = new ArrayList(prepareData.size() + prepareData2.size() + prepareData3.size());
                arrayList2.addAll(prepareData);
                arrayList2.addAll(prepareData2);
                arrayList2.addAll(prepareData3);
                ArrayList arrayList3 = new ArrayList(prepareData2.size() + prepareData3.size());
                arrayList3.addAll(prepareData2);
                arrayList3.addAll(prepareData3);
                checkResultPerKey(readResult, arrayList2, "The all committed records after recovery do not match what expected");
                checkResultPerKey(readResult2, arrayList3, "The committed records after recovery do not match what expected");
                MatcherAssert.assertThat("Should only get one uncaught exception from Streams.", Boolean.valueOf(this.hasUnexpectedError), CoreMatchers.is(false));
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
        if (this.eosConfig.equals("at_least_once")) {
            return;
        }
        KafkaStreams kafkaStreams = getKafkaStreams("dummy", true, "appDir", NUM_TOPIC_PARTITIONS, this.eosConfig);
        Throwable th = null;
        try {
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
            List<KeyValue<Long, Long>> prepareData = prepareData(0L, 10L, 0L, 1L);
            List<KeyValue<Long, Long>> prepareData2 = prepareData(10L, 15L, 0L, 1L, 2L, 3L);
            ArrayList arrayList = new ArrayList(prepareData.size() + prepareData2.size());
            arrayList.addAll(prepareData);
            arrayList.addAll(prepareData2);
            List<KeyValue<Long, Long>> prepareData3 = prepareData(15L, 20L, 0L, 1L);
            writeInputData(prepareData);
            TestUtils.waitForCondition(() -> {
                return this.commitRequested.get() == NUM_TOPIC_PARTITIONS;
            }, 120000L, "StreamsTasks did not request commit.");
            checkResultPerKey(readResult(SINGLE_PARTITION_OUTPUT_TOPIC, prepareData.size(), CONSUMER_GROUP_ID), computeExpectedResult(prepareData), "The committed records before failure do not match what expected");
            writeInputData(prepareData2);
            List<KeyValue<Long, Long>> readResult = readResult(SINGLE_PARTITION_OUTPUT_TOPIC, arrayList.size(), null);
            List<KeyValue<Long, Long>> computeExpectedResult = computeExpectedResult(arrayList);
            checkResultPerKey(readResult, computeExpectedResult, "The uncommitted records before failure do not match what expected");
            verifyStateStore(kafkaStreams, getMaxPerKey(computeExpectedResult), "The state store content before failure do not match what expected");
            this.errorInjected.set(true);
            writeInputData(prepareData3);
            TestUtils.waitForCondition(() -> {
                return this.uncaughtException != null;
            }, 120000L, "Should receive uncaught exception from one StreamThread.");
            List<KeyValue<Long, Long>> readResult2 = readResult(SINGLE_PARTITION_OUTPUT_TOPIC, prepareData.size() + prepareData2.size() + prepareData3.size(), "readCommitted_ALL");
            List<KeyValue<Long, Long>> readResult3 = readResult(SINGLE_PARTITION_OUTPUT_TOPIC, prepareData2.size() + prepareData3.size(), CONSUMER_GROUP_ID);
            ArrayList arrayList2 = new ArrayList(prepareData.size() + prepareData2.size() + prepareData3.size());
            arrayList2.addAll(prepareData);
            arrayList2.addAll(prepareData2);
            arrayList2.addAll(prepareData3);
            List<KeyValue<Long, Long>> computeExpectedResult2 = computeExpectedResult(arrayList2);
            checkResultPerKey(readResult2, computeExpectedResult2, "The all committed records after recovery do not match what expected");
            checkResultPerKey(readResult3, computeExpectedResult2.subList(prepareData.size(), computeExpectedResult2.size()), "The committed records after recovery do not match what expected");
            verifyStateStore(kafkaStreams, getMaxPerKey(computeExpectedResult2), "The state store content after recovery do not match what expected");
            MatcherAssert.assertThat("Should only get one uncaught exception from Streams.", Boolean.valueOf(this.hasUnexpectedError), CoreMatchers.is(false));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    /* JADX WARN: Failed to calculate best type for var: r14v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r14v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r15v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r15v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 14, insn: 0x02ea: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r14 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:61:0x02ea */
    /* JADX WARN: Not initialized variable reg: 15, insn: 0x02ee: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r15 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:63:0x02ee */
    /* JADX WARN: Type inference failed for: r14v0, types: [org.apache.kafka.streams.KafkaStreams] */
    /* JADX WARN: Type inference failed for: r15v0, types: [java.lang.Throwable] */
    @Test
    public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws Exception {
        ?? r14;
        ?? r15;
        KafkaStreams kafkaStreams;
        KafkaStreams kafkaStreams2;
        if (this.eosConfig.equals("at_least_once")) {
            return;
        }
        KafkaStreams kafkaStreams3 = getKafkaStreams("streams1", false, "appDir1", 1, this.eosConfig);
        Throwable th = null;
        try {
            try {
                KafkaStreams kafkaStreams4 = getKafkaStreams("streams2", false, "appDir2", 1, this.eosConfig);
                Throwable th2 = null;
                IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams3);
                IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams4);
                List<KeyValue<Long, Long>> prepareData = prepareData(0L, 10L, 0L, 1L);
                List<KeyValue<Long, Long>> prepareData2 = prepareData(10L, 15L, 0L, 1L);
                ArrayList arrayList = new ArrayList(prepareData.size() + prepareData2.size());
                arrayList.addAll(prepareData);
                arrayList.addAll(prepareData2);
                List<KeyValue<Long, Long>> prepareData3 = prepareData(15L, 20L, 0L, 1L);
                List<KeyValue<Long, Long>> prepareData4 = prepareData(20L, 30L, 0L, 1L);
                writeInputData(prepareData);
                TestUtils.waitForCondition(() -> {
                    return this.commitRequested.get() == NUM_TOPIC_PARTITIONS;
                }, 120000L, "StreamsTasks did not request commit.");
                checkResultPerKey(readResult(SINGLE_PARTITION_OUTPUT_TOPIC, prepareData.size(), CONSUMER_GROUP_ID), prepareData, "The committed records before stall do not match what expected");
                writeInputData(prepareData2);
                checkResultPerKey(readResult(SINGLE_PARTITION_OUTPUT_TOPIC, arrayList.size(), null), arrayList, "The uncommitted records before stall do not match what expected");
                LOG.info("Injecting Stall");
                this.stallInjected.set(true);
                writeInputData(prepareData3);
                LOG.info("Input Data Written");
                TestUtils.waitForCondition(() -> {
                    return this.stallingHost.get() != null;
                }, 120000L, "Expected a host to start stalling");
                String str = this.stallingHost.get();
                if ("streams1".equals(str)) {
                    kafkaStreams = kafkaStreams3;
                    kafkaStreams2 = kafkaStreams4;
                } else {
                    if (!"streams2".equals(str)) {
                        throw new IllegalArgumentException("unexpected host name: " + str);
                    }
                    kafkaStreams = kafkaStreams4;
                    kafkaStreams2 = kafkaStreams3;
                }
                KafkaStreams kafkaStreams5 = kafkaStreams;
                KafkaStreams kafkaStreams6 = kafkaStreams2;
                TestUtils.waitForCondition(() -> {
                    return kafkaStreams5.metadataForAllStreamsClients().size() == NUM_TOPIC_PARTITIONS && kafkaStreams6.metadataForAllStreamsClients().size() == 1 && ((StreamsMetadata) kafkaStreams6.metadataForAllStreamsClients().iterator().next()).topicPartitions().size() == NUM_TOPIC_PARTITIONS;
                }, 120000L, () -> {
                    return "Should have rebalanced.\nStreams1[" + kafkaStreams3.metadataForAllStreamsClients() + "]\nStreams2[" + kafkaStreams4.metadataForAllStreamsClients() + "]";
                });
                List<KeyValue<Long, Long>> readResult = readResult(SINGLE_PARTITION_OUTPUT_TOPIC, prepareData2.size() + prepareData3.size(), CONSUMER_GROUP_ID);
                ArrayList arrayList2 = new ArrayList(prepareData2.size() + prepareData3.size());
                arrayList2.addAll(prepareData2);
                arrayList2.addAll(prepareData3);
                checkResultPerKey(readResult, arrayList2, "The all committed records after rebalance do not match what expected");
                LOG.info("Releasing Stall");
                this.doStall = false;
                TestUtils.waitForCondition(() -> {
                    return kafkaStreams3.metadataForAllStreamsClients().size() == NUM_TOPIC_PARTITIONS && kafkaStreams4.metadataForAllStreamsClients().size() == NUM_TOPIC_PARTITIONS && kafkaStreams3.metadataForAllStreamsClients().stream().mapToLong(streamsMetadata -> {
                        return streamsMetadata.topicPartitions().size();
                    }).sum() == 2 && kafkaStreams4.metadataForAllStreamsClients().stream().mapToLong(streamsMetadata2 -> {
                        return streamsMetadata2.topicPartitions().size();
                    }).sum() == 2;
                }, 120000L, () -> {
                    return "Should have rebalanced.\nStreams1[" + kafkaStreams3.metadataForAllStreamsClients() + "]\nStreams2[" + kafkaStreams4.metadataForAllStreamsClients() + "]";
                });
                writeInputData(prepareData4);
                List<KeyValue<Long, Long>> readResult2 = readResult(SINGLE_PARTITION_OUTPUT_TOPIC, prepareData.size() + prepareData2.size() + prepareData3.size() + prepareData4.size(), "readCommitted_ALL");
                ArrayList arrayList3 = new ArrayList(prepareData.size() + prepareData2.size() + prepareData3.size() + prepareData4.size());
                arrayList3.addAll(prepareData);
                arrayList3.addAll(prepareData2);
                arrayList3.addAll(prepareData3);
                arrayList3.addAll(prepareData4);
                checkResultPerKey(readResult2, arrayList3, "The all committed records after recovery do not match what expected");
                if (kafkaStreams4 != null) {
                    if (0 != 0) {
                        try {
                            kafkaStreams4.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        kafkaStreams4.close();
                    }
                }
                if (kafkaStreams3 != null) {
                    if (0 == 0) {
                        kafkaStreams3.close();
                        return;
                    }
                    try {
                        kafkaStreams3.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            } catch (Throwable th5) {
                if (kafkaStreams3 != null) {
                    if (0 != 0) {
                        try {
                            kafkaStreams3.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    } else {
                        kafkaStreams3.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (r14 != 0) {
                if (r15 != 0) {
                    try {
                        r14.close();
                    } catch (Throwable th8) {
                        r15.addSuppressed(th8);
                    }
                } else {
                    r14.close();
                }
            }
            throw th7;
        }
    }

    @Test
    public void shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception {
        List<KeyValue<Long, Long>> prepareData = prepareData(0L, 10L, 0L, 1L);
        List<KeyValue<Long, Long>> computeExpectedResult = computeExpectedResult(prepareData);
        KafkaStreams kafkaStreams = getKafkaStreams("streams", true, "appDir", 1, this.eosConfig);
        Throwable th = null;
        try {
            try {
                writeInputData(prepareData);
                IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
                TestUtils.waitForCondition(() -> {
                    return this.commitRequested.get() == NUM_TOPIC_PARTITIONS;
                }, 120000L, "StreamsTasks did not request commit.");
                List<KeyValue<Long, Long>> readResult = readResult(SINGLE_PARTITION_OUTPUT_TOPIC, prepareData.size(), CONSUMER_GROUP_ID);
                if (!this.eosConfig.equals("at_least_once")) {
                    checkResultPerKey(readResult, computeExpectedResult, "The committed records do not match what expected");
                    verifyStateStore(kafkaStreams, getMaxPerKey(computeExpectedResult), "The state store content do not match what expected");
                }
                if (kafkaStreams != null) {
                    if (0 != 0) {
                        try {
                            kafkaStreams.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaStreams.close();
                    }
                }
                verifyOffsetsAreInCheckpoint(0);
                verifyOffsetsAreInCheckpoint(1);
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled() throws Exception {
        shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(true);
    }

    @Test
    public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterDisabled() throws Exception {
        if (this.processingThreadsEnabled) {
            return;
        }
        shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(false);
    }

    private void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(boolean z) throws Exception {
        if (this.eosConfig.equals("exactly_once") || this.eosConfig.equals("exactly_once_v2")) {
            Properties properties = new Properties();
            properties.put("application.id", this.applicationId);
            properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
            properties.put("default.key.serde", Serdes.Integer().getClass());
            properties.put("default.value.serde", Serdes.String().getClass());
            properties.put("processing.guarantee", this.eosConfig);
            properties.put("num.stream.threads", 1);
            properties.put("auto.offset.reset", "earliest");
            properties.put("state.dir", TestUtils.tempDirectory(this.applicationId).getPath());
            properties.put("__processing.threads.enabled__", Boolean.valueOf(this.processingThreadsEnabled));
            properties.put("__state.updater.enabled__", Boolean.valueOf(z));
            properties.put(StreamsConfig.restoreConsumerPrefix("max.poll.records"), 100);
            IntegrationTestUtils.purgeLocalStreamsState(properties);
            IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC, (List) IntStream.range(1, 29001).mapToObj(i -> {
                return KeyValue.pair(Integer.valueOf(i), 0);
            }).collect(Collectors.toList()), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), CLUSTER.time);
            StoreBuilder withCachingEnabled = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("stateStore"), Serdes.Integer(), Serdes.String()).withCachingEnabled();
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            TaskId taskId = new TaskId(0, 0);
            final AtomicLong atomicLong = new AtomicLong(0L);
            Topology topology = new Topology();
            topology.addSource("source", new String[]{MULTI_PARTITION_INPUT_TOPIC}).addProcessor("processor", () -> {
                return new Processor<Integer, String, Integer, String>() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.1
                    KeyValueStore stateStore;
                    ProcessorContext context;

                    public void init(ProcessorContext<Integer, String> processorContext) {
                        super.init(processorContext);
                        this.context = processorContext;
                        this.stateStore = processorContext.getStateStore("stateStore");
                    }

                    public void process(Record<Integer, String> record) {
                        Optional recordMetadata = this.context.recordMetadata();
                        AtomicBoolean atomicBoolean2 = atomicBoolean;
                        TaskId taskId2 = taskId;
                        recordMetadata.ifPresent(recordMetadata2 -> {
                            if (recordMetadata2.partition() != 0) {
                                this.stateStore.put(record.key(), record.value());
                            } else {
                                if (atomicBoolean2.compareAndSet(true, false)) {
                                    throw new TaskCorruptedException(Collections.singleton(taskId2));
                                }
                                this.stateStore.put(record.key(), record.value());
                            }
                        });
                    }

                    public void close() {
                        super.close();
                    }
                };
            }, new String[]{"source"}).addStateStore(withCachingEnabled, new String[]{"processor"});
            KafkaStreams kafkaStreams = new KafkaStreams(topology, properties);
            kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.2
                public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
                }

                public void onBatchRestored(TopicPartition topicPartition, String str, long j, long j2) {
                    if (topicPartition.partition() == 0) {
                        atomicLong.set(j);
                        if (j > 100) {
                            countDownLatch.countDown();
                        }
                    }
                }

                public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
                }
            });
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofSeconds(60L));
            ensureCommittedRecordsInTopicPartition(this.applicationId + "-stateStore-changelog", 0, 2000, IntegerDeserializer.class, IntegerDeserializer.class);
            atomicBoolean.set(true);
            IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC, (List) IntStream.range(29001, 30001).mapToObj(i2 -> {
                return KeyValue.pair(Integer.valueOf(i2), 0);
            }).collect(Collectors.toList()), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), CLUSTER.time);
            countDownLatch.await();
            kafkaStreams.close();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60L));
            File file = Paths.get(properties.getProperty("state.dir"), properties.getProperty("application.id"), taskId.toString(), ".checkpoint").toFile();
            Assert.assertTrue(file.exists());
            Assert.assertEquals(Long.valueOf(atomicLong.get()), new ArrayList(new OffsetCheckpoint(file).read().values()).get(0));
        }
    }

    private void verifyOffsetsAreInCheckpoint(int i) throws IOException {
        new OffsetCheckpoint(new File((this.stateTmpDir + File.separator + "appDir" + File.separator + this.applicationId + File.separator + "0_" + i + File.separator) + ".checkpoint")).read().forEach((v1, v2) -> {
            verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset(v1, v2);
        });
    }

    private void verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset(TopicPartition topicPartition, long j) {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), Serdes.ByteArray().deserializer().getClass(), Serdes.ByteArray().deserializer().getClass()));
        List singletonList = Collections.singletonList(topicPartition);
        kafkaConsumer.assign(singletonList);
        kafkaConsumer.seekToEnd(singletonList);
        long position = kafkaConsumer.position(topicPartition);
        Assert.assertTrue("changelog topic end " + position + " is less than checkpointed offset " + j, position >= j);
        kafkaConsumer.seekToBeginning(singletonList);
        Long l = null;
        while (kafkaConsumer.position(topicPartition) != position) {
            List records = kafkaConsumer.poll(Duration.ofMillis(0L)).records(topicPartition);
            if (!records.isEmpty()) {
                l = Long.valueOf(((ConsumerRecord) records.get(records.size() - 1)).offset());
            }
        }
        Assert.assertEquals("Checkpointed offset does not match end of changelog", l, Long.valueOf(j));
    }

    private List<KeyValue<Long, Long>> prepareData(long j, long j2, Long... lArr) {
        ArrayList arrayList = new ArrayList((int) (lArr.length * (j2 - j)));
        for (Long l : lArr) {
            long j3 = j;
            while (true) {
                long j4 = j3;
                if (j4 < j2) {
                    arrayList.add(new KeyValue(l, Long.valueOf(j4)));
                    j3 = j4 + 1;
                }
            }
        }
        return arrayList;
    }

    private KafkaStreams getKafkaStreams(final String str, final boolean z, String str2, int i, String str3) {
        this.commitRequested = new AtomicInteger(0);
        this.errorInjected = new AtomicBoolean(false);
        this.stallInjected = new AtomicBoolean(false);
        this.stallingHost = new AtomicReference<>();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        String[] strArr = new String[0];
        if (z) {
            strArr = new String[]{"store"};
            streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), Serdes.Long(), Serdes.Long()).withCachingEnabled());
        }
        streamsBuilder.stream(MULTI_PARTITION_INPUT_TOPIC).transform(new TransformerSupplier<Long, Long, KeyValue<Long, Long>>() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.3
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Transformer<Long, Long, KeyValue<Long, Long>> m15get() {
                return new Transformer<Long, Long, KeyValue<Long, Long>>() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.3.1
                    org.apache.kafka.streams.processor.ProcessorContext context;
                    KeyValueStore<Long, Long> state = null;

                    public void init(org.apache.kafka.streams.processor.ProcessorContext processorContext) {
                        this.context = processorContext;
                        if (z) {
                            this.state = processorContext.getStateStore("store");
                        }
                    }

                    public KeyValue<Long, Long> transform(Long l, Long l2) {
                        if (EosIntegrationTest.this.stallInjected.compareAndSet(true, false)) {
                            EosIntegrationTest.LOG.info(str + " is executing the injected stall");
                            EosIntegrationTest.this.stallingHost.set(str);
                            while (EosIntegrationTest.this.doStall) {
                                StreamThread currentThread = Thread.currentThread();
                                if (currentThread.isInterrupted()) {
                                    throw new RuntimeException("Detected we've been interrupted.");
                                }
                                if (!EosIntegrationTest.this.processingThreadsEnabled && !currentThread.isRunning()) {
                                    throw new RuntimeException("Detected we've been interrupted.");
                                }
                                try {
                                    Thread.sleep(100L);
                                } catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                            }
                        }
                        if ((l2.longValue() + 1) % 10 == 0) {
                            this.context.commit();
                            EosIntegrationTest.this.commitRequested.incrementAndGet();
                        }
                        if (this.state != null) {
                            Long l3 = (Long) this.state.get(l);
                            this.state.put(l, l3 == null ? l2 : Long.valueOf(l3.longValue() + l2.longValue()));
                            this.state.flush();
                        }
                        if (EosIntegrationTest.this.errorInjected.compareAndSet(true, false)) {
                            throw new RuntimeException("Injected test exception.");
                        }
                        return this.state != null ? new KeyValue<>(l, this.state.get(l)) : new KeyValue<>(l, l2);
                    }

                    public void close() {
                    }
                };
            }
        }, strArr).to(SINGLE_PARTITION_OUTPUT_TOPIC);
        this.stateTmpDir = TestUtils.tempDirectory().getPath() + File.separator;
        Properties properties = new Properties();
        properties.put("processing.guarantee", str3);
        properties.put("num.stream.threads", Integer.valueOf(i));
        properties.put("commit.interval.ms", 20000L);
        properties.put(StreamsConfig.producerPrefix("transaction.timeout.ms"), 20000);
        properties.put(StreamsConfig.consumerPrefix("metadata.max.age.ms"), "1000");
        properties.put(StreamsConfig.consumerPrefix("auto.offset.reset"), "earliest");
        properties.put(StreamsConfig.consumerPrefix("session.timeout.ms"), 29999);
        properties.put(StreamsConfig.consumerPrefix("max.poll.interval.ms"), Integer.valueOf(MAX_POLL_INTERVAL_MS));
        properties.put("statestore.cache.max.bytes", 0);
        properties.put("state.dir", this.stateTmpDir + str2);
        properties.put("application.server", str + ":2142");
        properties.put("__state.updater.enabled__", Boolean.valueOf(this.processingThreadsEnabled));
        properties.put("__processing.threads.enabled__", Boolean.valueOf(this.processingThreadsEnabled));
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), StreamsTestUtils.getStreamsConfig(this.applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), properties));
        kafkaStreams.setUncaughtExceptionHandler((thread, th) -> {
            if (this.uncaughtException != null || !th.getMessage().contains("Injected test exception")) {
                th.printStackTrace(System.err);
                this.hasUnexpectedError = true;
            }
            this.uncaughtException = th;
        });
        return kafkaStreams;
    }

    private void writeInputData(List<KeyValue<Long, Long>> list) {
        IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC, list, TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), CLUSTER.time);
    }

    private List<KeyValue<Long, Long>> readResult(String str, int i, String str2) throws Exception {
        return readResult(str, i, LongDeserializer.class, LongDeserializer.class, str2);
    }

    private <K, V> List<KeyValue<K, V>> readResult(String str, int i, Class<? extends Deserializer<K>> cls, Class<? extends Deserializer<V>> cls2, String str2) throws Exception {
        return str2 != null ? IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), str2, cls, cls2, Utils.mkProperties(Collections.singletonMap("isolation.level", IsolationLevel.READ_COMMITTED.toString()))), str, i) : IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), cls, cls2), str, i);
    }

    private <K, V> void ensureCommittedRecordsInTopicPartition(String str, int i, int i2, Class<? extends Deserializer<K>> cls, Class<? extends Deserializer<V>> cls2) throws Exception {
        long currentTimeMillis = System.currentTimeMillis() + 120000;
        int i3 = 0;
        do {
            i3++;
            if (IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), CONSUMER_GROUP_ID, cls, cls2, Utils.mkProperties(Collections.singletonMap("isolation.level", IsolationLevel.READ_COMMITTED.toString()))), str, i2, 120000L).stream().anyMatch(consumerRecord -> {
                return consumerRecord.partition() == i;
            })) {
                return;
            }
            if (i3 >= 10) {
                throw new AssertionError("No committed records in topic " + str + ", partition " + i + " after 10 retries.");
            }
        } while (System.currentTimeMillis() <= currentTimeMillis);
        throw new AssertionError("No committed records in topic " + str + ", partition " + i + " after 120000 ms.");
    }

    private List<KeyValue<Long, Long>> computeExpectedResult(List<KeyValue<Long, Long>> list) {
        ArrayList arrayList = new ArrayList(list.size());
        HashMap hashMap = new HashMap();
        for (KeyValue<Long, Long> keyValue : list) {
            Long l = (Long) hashMap.get(keyValue.key);
            Long valueOf = l == null ? (Long) keyValue.value : Long.valueOf(l.longValue() + ((Long) keyValue.value).longValue());
            hashMap.put(keyValue.key, valueOf);
            arrayList.add(new KeyValue(keyValue.key, valueOf));
        }
        return arrayList;
    }

    private Set<KeyValue<Long, Long>> getMaxPerKey(List<KeyValue<Long, Long>> list) {
        HashSet hashSet = new HashSet(list.size());
        HashMap hashMap = new HashMap();
        for (KeyValue<Long, Long> keyValue : list) {
            Long l = (Long) hashMap.get(keyValue.key);
            if (l == null || ((Long) keyValue.value).longValue() > l.longValue()) {
                hashMap.put(keyValue.key, keyValue.value);
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            hashSet.add(new KeyValue(entry.getKey(), entry.getValue()));
        }
        return hashSet;
    }

    private void verifyStateStore(KafkaStreams kafkaStreams, Set<KeyValue<Long, Long>> set, String str) {
        Iterator it = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, StateQueryRequest.inStore("store").withQuery(RangeQuery.withNoBounds())).getPartitionResults().values().iterator();
        while (it.hasNext()) {
            KeyValueIterator keyValueIterator = (KeyValueIterator) ((QueryResult) it.next()).getResult();
            Throwable th = null;
            while (keyValueIterator.hasNext()) {
                try {
                    try {
                        Assert.assertTrue(str, set.remove(keyValueIterator.next()));
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (keyValueIterator != null) {
                        if (th != null) {
                            try {
                                keyValueIterator.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            keyValueIterator.close();
                        }
                    }
                    throw th2;
                }
            }
            if (keyValueIterator != null) {
                if (0 != 0) {
                    try {
                        keyValueIterator.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    keyValueIterator.close();
                }
            }
        }
        Assert.assertTrue(str, set.isEmpty());
    }
}
