package org.apache.kafka.streams.integration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/integration/FanoutIntegrationTest.class */
public class FanoutIntegrationTest {

    @ClassRule
    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
    private static final String INPUT_TOPIC_A = "A";
    private static final String OUTPUT_TOPIC_B = "B";
    private static final String OUTPUT_TOPIC_C = "C";

    @BeforeClass
    public static void startKafkaCluster() throws Exception {
        CLUSTER.createTopic(INPUT_TOPIC_A);
        CLUSTER.createTopic(OUTPUT_TOPIC_B);
        CLUSTER.createTopic(OUTPUT_TOPIC_C);
    }

    @Test
    public void shouldFanoutTheInput() throws Exception {
        List<String> asList = Arrays.asList("Hello", "World");
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (String str : asList) {
            arrayList.add(str.toUpperCase(Locale.getDefault()));
            arrayList2.add(str.toLowerCase(Locale.getDefault()));
        }
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        Properties properties = new Properties();
        properties.put("application.id", "fanout-integration-test");
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("zookeeper.connect", CLUSTER.zKConnectString());
        properties.put("value.serde", Serdes.String().getClass().getName());
        KStream stream = kStreamBuilder.stream(new String[]{INPUT_TOPIC_A});
        KStream mapValues = stream.mapValues(new ValueMapper<String, String>() { // from class: org.apache.kafka.streams.integration.FanoutIntegrationTest.1
            public String apply(String str2) {
                return str2.toUpperCase(Locale.getDefault());
            }
        });
        KStream mapValues2 = stream.mapValues(new ValueMapper<String, String>() { // from class: org.apache.kafka.streams.integration.FanoutIntegrationTest.2
            public String apply(String str2) {
                return str2.toLowerCase(Locale.getDefault());
            }
        });
        mapValues.to(OUTPUT_TOPIC_B);
        mapValues2.to(OUTPUT_TOPIC_C);
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, properties);
        kafkaStreams.start();
        Thread.sleep(5000L);
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties2.put("acks", "all");
        properties2.put("retries", 0);
        properties2.put("key.serializer", ByteArraySerializer.class);
        properties2.put("value.serializer", StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(INPUT_TOPIC_A, asList, properties2);
        Thread.sleep(10000L);
        kafkaStreams.close();
        Properties properties3 = new Properties();
        properties3.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties3.put("group.id", "fanout-integration-test-standard-consumer-topicB");
        properties3.put("auto.offset.reset", "earliest");
        properties3.put("key.deserializer", ByteArrayDeserializer.class);
        properties3.put("value.deserializer", StringDeserializer.class);
        Assert.assertThat(IntegrationTestUtils.readValues(OUTPUT_TOPIC_B, properties3, asList.size()), CoreMatchers.equalTo(arrayList));
        Properties properties4 = new Properties();
        properties4.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties4.put("group.id", "fanout-integration-test-standard-consumer-topicC");
        properties4.put("auto.offset.reset", "earliest");
        properties4.put("key.deserializer", ByteArrayDeserializer.class);
        properties4.put("value.deserializer", StringDeserializer.class);
        Assert.assertThat(IntegrationTestUtils.readValues(OUTPUT_TOPIC_C, properties4, asList.size()), CoreMatchers.equalTo(arrayList2));
    }
}
