package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.class */
public class JoinGracePeriodDurabilityIntegrationTest {

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);

    @Rule
    public TestName testName = new TestName();
    private static final long COMMIT_INTERVAL = 100;

    @Parameterized.Parameter
    public String processingGuaranteee;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3, Utils.mkProperties(Utils.mkMap(new Map.Entry[0])), 0);
    private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final Serde<String> STRING_SERDE = Serdes.String();
    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Parameterized.Parameters(name = "{0}")
    public static Collection<String[]> data() {
        return Arrays.asList(new String[]{"at_least_once"}, new String[]{"exactly_once"}, new String[]{"exactly_once_v2"});
    }

    @Test
    public void shouldRecoverBufferAfterShutdown() {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(this.testName);
        String str = "appId_" + safeUniqueTestName;
        String str2 = "Streaminput" + safeUniqueTestName;
        String str3 = "Tableinput" + safeUniqueTestName;
        String str4 = "output" + safeUniqueTestName;
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, str2, str3, str4);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream join = streamsBuilder.stream(str2, Consumed.with(STRING_SERDE, STRING_SERDE)).join(streamsBuilder.table(str3, Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.as(Stores.persistentVersionedKeyValueStore("grace", Duration.ofMillis(1000L)))), MockValueJoiner.TOSTRING_JOINER, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "Grace", Duration.ofMillis(5L)));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        join.foreach((str5, str6) -> {
            atomicInteger.incrementAndGet();
        });
        join.to(str4);
        Properties mkObjectProperties = Utils.mkObjectProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", str), Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers()), Utils.mkEntry("poll.ms", Long.toString(COMMIT_INTERVAL)), Utils.mkEntry("processing.guarantee", this.processingGuaranteee), Utils.mkEntry("state.dir", TestUtils.tempDirectory().getPath()), Utils.mkEntry("default.key.serde", Serdes.StringSerde.class), Utils.mkEntry("default.value.serde", Serdes.StringSerde.class)}));
        mkObjectProperties.put("commit.interval.ms", Long.valueOf(COMMIT_INTERVAL));
        KafkaStreams startedStreams = IntegrationTestUtils.getStartedStreams(mkObjectProperties, streamsBuilder, true);
        try {
            produceSynchronouslyToPartitionZero(str3, Arrays.asList(new KeyValueTimestamp("k1", "v1", scaledTime(0L)), new KeyValueTimestamp("k2", "v2", scaledTime(0L)), new KeyValueTimestamp("k3", "v3", scaledTime(0L)), new KeyValueTimestamp("k4", "v4", scaledTime(0L)), new KeyValueTimestamp("k5", "v5", scaledTime(0L)), new KeyValueTimestamp("k6", "v6", scaledTime(0L))));
            produceSynchronouslyToPartitionZero(str2, Arrays.asList(new KeyValueTimestamp("k1", "v1", scaledTime(1L)), new KeyValueTimestamp("k2", "v2", scaledTime(2L)), new KeyValueTimestamp("k3", "v3", scaledTime(7L))));
            verifyOutput(str4, Arrays.asList(new KeyValueTimestamp("k1", "v1+v1", scaledTime(1L)), new KeyValueTimestamp("k2", "v2+v2", scaledTime(2L))));
            MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), CoreMatchers.is(2));
            produceSynchronouslyToPartitionZero(str2, Arrays.asList(new KeyValueTimestamp("k4", "v4", scaledTime(4L)), new KeyValueTimestamp("k5", "v5", scaledTime(5L))));
            startedStreams.close();
            MatcherAssert.assertThat(startedStreams.state(), CoreMatchers.is(KafkaStreams.State.NOT_RUNNING));
            startedStreams = IntegrationTestUtils.getStartedStreams(mkObjectProperties, streamsBuilder, false);
            produceSynchronouslyToPartitionZero(str2, Arrays.asList(new KeyValueTimestamp("k6", "v6", scaledTime(20L))));
            verifyOutput(str4, Arrays.asList(new KeyValueTimestamp("k4", "v4+v4", scaledTime(4L)), new KeyValueTimestamp("k5", "v5+v5", scaledTime(5L)), new KeyValueTimestamp("k3", "v3+v3", scaledTime(7L))));
            MatcherAssert.assertThat("There should only be 5 output events.", Integer.valueOf(atomicInteger.get()), CoreMatchers.is(5));
            startedStreams.close();
            IntegrationTestUtils.quietlyCleanStateAfterTest(CLUSTER, startedStreams);
        } catch (Throwable th) {
            startedStreams.close();
            IntegrationTestUtils.quietlyCleanStateAfterTest(CLUSTER, startedStreams);
            throw th;
        }
    }

    private void verifyOutput(String str, List<KeyValueTimestamp<String, String>> list) {
        IntegrationTestUtils.verifyKeyValueTimestamps(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("group.id", "test-group"), Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers()), Utils.mkEntry("key.deserializer", STRING_DESERIALIZER.getClass().getName()), Utils.mkEntry("value.deserializer", STRING_DESERIALIZER.getClass().getName())})), str, list);
    }

    private long scaledTime(long j) {
        return 200 * j;
    }

    private static void produceSynchronouslyToPartitionZero(String str, List<KeyValueTimestamp<String, String>> list) {
        IntegrationTestUtils.produceSynchronously(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("client.id", "anything"), Utils.mkEntry("key.serializer", STRING_SERIALIZER.getClass().getName()), Utils.mkEntry("value.serializer", STRING_SERIALIZER.getClass().getName()), Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers())})), false, str, Optional.of(0), list);
    }
}
