package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("integration")
@Timeout(600)
/* loaded from: input_file:org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.class */
public class OptimizedKTableIntegrationTest {
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final String TABLE_NAME = "source-table";
    private final List<KafkaStreams> streamsToCleanup = new ArrayList();
    private final MockTime mockTime = CLUSTER.time;
    private static final Logger LOG = LoggerFactory.getLogger(OptimizedKTableIntegrationTest.class);
    private static int port = 0;
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void before() throws InterruptedException {
        CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, NUM_BROKERS);
    }

    @AfterEach
    public void after() {
        Iterator<KafkaStreams> it = this.streamsToCleanup.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    @Test
    public void shouldApplyUpdatesToStandbyStore(TestInfo testInfo) throws Exception {
        Semaphore semaphore = new Semaphore(0);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as(TABLE_NAME).withCachingDisabled()).toStream().peek((num, num2) -> {
            semaphore.release();
        });
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
        KafkaStreams createKafkaStreams = createKafkaStreams(streamsBuilder, streamsConfiguration(safeUniqueTestName));
        KafkaStreams createKafkaStreams2 = createKafkaStreams(streamsBuilder, streamsConfiguration(safeUniqueTestName));
        try {
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(Arrays.asList(createKafkaStreams, createKafkaStreams2), Duration.ofSeconds(60L));
            produceValueRange(NUM_BROKERS, 0, 100);
            MatcherAssert.assertThat(Boolean.valueOf(semaphore.tryAcquire(100, 60L, TimeUnit.SECONDS)), Matchers.is(Matchers.equalTo(true)));
            AtomicReference atomicReference = new AtomicReference(null);
            TestUtils.retryOnExceptionWithTimeout(() -> {
                ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(TABLE_NAME, createKafkaStreams, QueryableStoreTypes.keyValueStore());
                ReadOnlyKeyValueStore readOnlyKeyValueStore2 = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(TABLE_NAME, createKafkaStreams2, QueryableStoreTypes.keyValueStore());
                try {
                    if (createKafkaStreams.queryMetadataForKey(TABLE_NAME, Integer.valueOf(NUM_BROKERS), (str, num3, obj, i) -> {
                        return 0;
                    }).activeHost().port() % 2 == NUM_BROKERS) {
                        MatcherAssert.assertThat(readOnlyKeyValueStore.get(Integer.valueOf(NUM_BROKERS)), Matchers.is(Matchers.equalTo(99)));
                        createKafkaStreams.close();
                        atomicReference.set(readOnlyKeyValueStore2);
                    } else {
                        MatcherAssert.assertThat(readOnlyKeyValueStore2.get(Integer.valueOf(NUM_BROKERS)), Matchers.is(Matchers.equalTo(99)));
                        createKafkaStreams2.close();
                        atomicReference.set(readOnlyKeyValueStore);
                    }
                } catch (InvalidStateStoreException e) {
                    LOG.warn("Detected an unexpected rebalance during test. Retrying if possible.", e);
                    throw e;
                } catch (Throwable th) {
                    LOG.error("Caught non-retriable exception in test. Exiting.", th);
                    throw new NoRetryException(th);
                }
            });
            TestUtils.retryOnExceptionWithTimeout(IntegrationTestUtils.DEFAULT_TIMEOUT, 100L, () -> {
                MatcherAssert.assertThat(((ReadOnlyKeyValueStore) atomicReference.get()).get(Integer.valueOf(NUM_BROKERS)), Matchers.is(Matchers.equalTo(99)));
            });
            produceValueRange(NUM_BROKERS, 100, 200);
            MatcherAssert.assertThat(Boolean.valueOf(semaphore.tryAcquire(100, 60L, TimeUnit.SECONDS)), Matchers.is(Matchers.equalTo(true)));
            TestUtils.retryOnExceptionWithTimeout(IntegrationTestUtils.DEFAULT_TIMEOUT, 100L, () -> {
                MatcherAssert.assertThat(((ReadOnlyKeyValueStore) atomicReference.get()).get(Integer.valueOf(NUM_BROKERS)), Matchers.is(Matchers.equalTo(199)));
            });
            createKafkaStreams.close();
            createKafkaStreams2.close();
        } catch (Throwable th) {
            createKafkaStreams.close();
            createKafkaStreams2.close();
            throw th;
        }
    }

    private void produceValueRange(int i, int i2, int i3) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("key.serializer", IntegerSerializer.class);
        properties.put("value.serializer", IntegerSerializer.class);
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC_NAME, (Collection) IntStream.range(i2, i3).mapToObj(i4 -> {
            return KeyValue.pair(Integer.valueOf(i), Integer.valueOf(i4));
        }).collect(Collectors.toList()), properties, this.mockTime);
    }

    private KafkaStreams createKafkaStreams(StreamsBuilder streamsBuilder, Properties properties) {
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(properties), properties);
        this.streamsToCleanup.add(kafkaStreams);
        return kafkaStreams;
    }

    private Properties streamsConfiguration(String str) {
        Properties properties = new Properties();
        properties.put("topology.optimization", "all");
        properties.put("application.id", "app-" + str);
        StringBuilder append = new StringBuilder().append("localhost:");
        int i = port + NUM_BROKERS;
        port = i;
        properties.put("application.server", append.append(i).toString());
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("state.dir", TestUtils.tempDirectory().getPath());
        properties.put("default.key.serde", Serdes.Integer().getClass());
        properties.put("default.value.serde", Serdes.Integer().getClass());
        properties.put("num.standby.replicas", Integer.valueOf(NUM_BROKERS));
        properties.put("commit.interval.ms", 100L);
        properties.put("statestore.cache.max.bytes", 0);
        properties.put("max.poll.records", 100);
        properties.put("heartbeat.interval.ms", 200);
        properties.put("session.timeout.ms", 1000);
        return properties;
    }
}
