package org.apache.kafka.streams.integration;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/integration/PassThroughIntegrationTest.class */
public class PassThroughIntegrationTest {

    @ClassRule
    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
    private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";

    @BeforeClass
    public static void startKafkaCluster() throws Exception {
        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC);
        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
    }

    @Test
    public void shouldWriteTheInputDataAsIsToTheOutputTopic() throws Exception {
        List asList = Arrays.asList("hello world", "the world is not enough", "the world of the stock market is coming to an end");
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        Properties properties = new Properties();
        properties.put("application.id", "pass-through-integration-test");
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("zookeeper.connect", CLUSTER.zKConnectString());
        properties.put("key.serde", Serdes.String().getClass().getName());
        properties.put("value.serde", Serdes.String().getClass().getName());
        kStreamBuilder.stream(new String[]{DEFAULT_INPUT_TOPIC}).to(DEFAULT_OUTPUT_TOPIC);
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, properties);
        kafkaStreams.start();
        Thread.sleep(5000L);
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties2.put("acks", "all");
        properties2.put("retries", 0);
        properties2.put("key.serializer", StringSerializer.class);
        properties2.put("value.serializer", StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, asList, properties2);
        Thread.sleep(10000L);
        kafkaStreams.close();
        Properties properties3 = new Properties();
        properties3.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties3.put("group.id", "pass-through-integration-test-standard-consumer");
        properties3.put("auto.offset.reset", "earliest");
        properties3.put("key.deserializer", StringDeserializer.class);
        properties3.put("value.deserializer", StringDeserializer.class);
        Assert.assertThat(IntegrationTestUtils.readValues(DEFAULT_OUTPUT_TOPIC, properties3, asList.size()), CoreMatchers.equalTo(asList));
    }
}
