package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.class */
public class RepartitionWithMergeOptimizingIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final String INPUT_A_TOPIC = "inputA";
    private static final String INPUT_B_TOPIC = "inputB";
    private static final String COUNT_TOPIC = "outputTopic_0";
    private static final String COUNT_STRING_TOPIC = "outputTopic_1";
    private static final int ONE_REPARTITION_TOPIC = 1;
    private static final int TWO_REPARTITION_TOPICS = 2;
    private Properties streamsConfiguration;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [inputA])\n      --> KSTREAM-MAP-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [inputB])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> KSTREAM-MERGE-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> KSTREAM-MERGE-0000000004\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-MERGE-0000000004 (stores: [])\n      --> KSTREAM-FILTER-0000000021\n      <-- KSTREAM-MAP-0000000002, KSTREAM-MAP-0000000003\n    Processor: KSTREAM-FILTER-0000000021 (stores: [])\n      --> KSTREAM-SINK-0000000020\n      <-- KSTREAM-MERGE-0000000004\n    Sink: KSTREAM-SINK-0000000020 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition)\n      <-- KSTREAM-FILTER-0000000021\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000022 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition])\n      --> KSTREAM-AGGREGATE-0000000006, KSTREAM-AGGREGATE-0000000013\n    Processor: KSTREAM-AGGREGATE-0000000013 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n      --> KTABLE-TOSTREAM-0000000017\n      <-- KSTREAM-SOURCE-0000000022\n    Processor: KSTREAM-AGGREGATE-0000000006 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000005])\n      --> KTABLE-TOSTREAM-0000000010\n      <-- KSTREAM-SOURCE-0000000022\n    Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000018\n      <-- KSTREAM-AGGREGATE-0000000013\n    Processor: KSTREAM-MAPVALUES-0000000018 (stores: [])\n      --> KSTREAM-SINK-0000000019\n      <-- KTABLE-TOSTREAM-0000000017\n    Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n      --> KSTREAM-SINK-0000000011\n      <-- KSTREAM-AGGREGATE-0000000006\n    Sink: KSTREAM-SINK-0000000011 (topic: outputTopic_0)\n      <-- KTABLE-TOSTREAM-0000000010\n    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n      <-- KSTREAM-MAPVALUES-0000000018\n\n";
    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [inputA])\n      --> KSTREAM-MAP-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [inputB])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> KSTREAM-MERGE-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> KSTREAM-MERGE-0000000004\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-MERGE-0000000004 (stores: [])\n      --> KSTREAM-FILTER-0000000008, KSTREAM-FILTER-0000000015\n      <-- KSTREAM-MAP-0000000002, KSTREAM-MAP-0000000003\n    Processor: KSTREAM-FILTER-0000000008 (stores: [])\n      --> KSTREAM-SINK-0000000007\n      <-- KSTREAM-MERGE-0000000004\n    Processor: KSTREAM-FILTER-0000000015 (stores: [])\n      --> KSTREAM-SINK-0000000014\n      <-- KSTREAM-MERGE-0000000004\n    Sink: KSTREAM-SINK-0000000007 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition)\n      <-- KSTREAM-FILTER-0000000008\n    Sink: KSTREAM-SINK-0000000014 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition)\n      <-- KSTREAM-FILTER-0000000015\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000009 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition])\n      --> KSTREAM-AGGREGATE-0000000006\n    Processor: KSTREAM-AGGREGATE-0000000006 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000005])\n      --> KTABLE-TOSTREAM-0000000010\n      <-- KSTREAM-SOURCE-0000000009\n    Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n      --> KSTREAM-SINK-0000000011\n      <-- KSTREAM-AGGREGATE-0000000006\n    Sink: KSTREAM-SINK-0000000011 (topic: outputTopic_0)\n      <-- KTABLE-TOSTREAM-0000000010\n\n  Sub-topology: 2\n    Source: KSTREAM-SOURCE-0000000016 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition])\n      --> KSTREAM-AGGREGATE-0000000013\n    Processor: KSTREAM-AGGREGATE-0000000013 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n      --> KTABLE-TOSTREAM-0000000017\n      <-- KSTREAM-SOURCE-0000000016\n    Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000018\n      <-- KSTREAM-AGGREGATE-0000000013\n    Processor: KSTREAM-MAPVALUES-0000000018 (stores: [])\n      --> KSTREAM-SINK-0000000019\n      <-- KTABLE-TOSTREAM-0000000017\n    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n      <-- KSTREAM-MAPVALUES-0000000018\n\n";
    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
    private final MockTime mockTime = CLUSTER.time;

    @Before
    public void setUp() throws Exception {
        Properties properties = new Properties();
        properties.put("cache.max.bytes.buffering", 10240);
        properties.put("commit.interval.ms", 5000);
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig("maybe-optimized-with-merge-test-app", CLUSTER.bootstrapServers(), Serdes.String().getClass().getName(), Serdes.String().getClass().getName(), properties);
        CLUSTER.createTopics(COUNT_TOPIC, COUNT_STRING_TOPIC, INPUT_A_TOPIC, INPUT_B_TOPIC);
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @After
    public void tearDown() throws Exception {
        CLUSTER.deleteAllTopicsAndWait(30000L);
    }

    @Test
    public void shouldSendCorrectRecords_OPTIMIZED() throws Exception {
        runIntegrationTest("all", 1);
    }

    @Test
    public void shouldSendCorrectResults_NO_OPTIMIZATION() throws Exception {
        runIntegrationTest("none", TWO_REPARTITION_TOPICS);
    }

    private void runIntegrationTest(String str, int i) throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream merge = streamsBuilder.stream(INPUT_A_TOPIC, Consumed.with(Serdes.String(), Serdes.String())).map((str2, str3) -> {
            return KeyValue.pair(str3.split(":")[0], str3);
        }).merge(streamsBuilder.stream(INPUT_B_TOPIC, Consumed.with(Serdes.String(), Serdes.String())).map((str4, str5) -> {
            return KeyValue.pair(str5.split(":")[0], str5);
        }));
        merge.groupByKey().count().toStream().to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
        merge.groupByKey().count().toStream().mapValues(l -> {
            return l.toString();
        }).to(COUNT_STRING_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
        this.streamsConfiguration.setProperty("topology.optimization", str);
        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_A_TOPIC, getKeyValues(), producerConfig, this.mockTime);
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_B_TOPIC, getKeyValues(), producerConfig, this.mockTime);
        Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class);
        Properties consumerConfig2 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        Topology build = streamsBuilder.build(this.streamsConfiguration);
        String obj = build.describe().toString();
        System.out.println(obj);
        if (str.equals("all")) {
            Assert.assertEquals(EXPECTED_OPTIMIZED_TOPOLOGY, obj);
        } else {
            Assert.assertEquals(EXPECTED_UNOPTIMIZED_TOPOLOGY, obj);
        }
        Assert.assertEquals(i, getCountOfRepartitionTopicsFound(obj));
        KafkaStreams kafkaStreams = new KafkaStreams(build, this.streamsConfiguration);
        kafkaStreams.start();
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, COUNT_TOPIC, Arrays.asList(KeyValue.pair("A", 6L), KeyValue.pair("B", 6L), KeyValue.pair("C", 6L)));
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, COUNT_STRING_TOPIC, Arrays.asList(KeyValue.pair("A", "6"), KeyValue.pair("B", "6"), KeyValue.pair("C", "6")));
        kafkaStreams.close(Duration.ofSeconds(5L));
    }

    private int getCountOfRepartitionTopicsFound(String str) {
        Matcher matcher = this.repartitionTopicPattern.matcher(str);
        ArrayList arrayList = new ArrayList();
        while (matcher.find()) {
            arrayList.add(matcher.group());
        }
        return arrayList.size();
    }

    private List<KeyValue<String, String>> getKeyValues() {
        ArrayList arrayList = new ArrayList();
        String[] strArr = {"A:foo", "B:foo", "C:foo"};
        for (String str : new String[]{"X", "Y", "Z"}) {
            for (String str2 : strArr) {
                arrayList.add(KeyValue.pair(str, str2));
            }
        }
        return arrayList;
    }
}
