package org.apache.kafka.streams.tests;

import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

/* loaded from: input_file:org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.class */
public class StreamsNamedRepartitionTest {
    public static void main(String[] strArr) throws Exception {
        if (strArr.length < 1) {
            System.err.println("StreamsNamedRepartitionTest requires one argument (properties-file) but none provided: ");
        }
        Properties loadProps = Utils.loadProps(strArr[0]);
        System.out.println("StreamsTest instance started NAMED_REPARTITION_TEST");
        System.out.println("props=" + String.valueOf(loadProps));
        String str = (String) Objects.requireNonNull(loadProps.remove("input.topic"));
        String str2 = (String) Objects.requireNonNull(loadProps.remove("aggregation.topic"));
        boolean booleanValue = Boolean.valueOf((String) Objects.requireNonNull((String) loadProps.remove("add.operations"))).booleanValue();
        Initializer initializer = () -> {
            return 0;
        };
        Aggregator aggregator = (str3, str4, num) -> {
            return Integer.valueOf(num.intValue() + Integer.parseInt(str4));
        };
        Function function = str5 -> {
            return Integer.toString(Integer.parseInt(str5) % 9);
        };
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(str, Consumed.with(Serdes.String(), Serdes.String()));
        stream.peek((str6, str7) -> {
            System.out.printf("input data key=%s, value=%s%n", str6, str7);
        });
        KStream selectKey = stream.selectKey((str8, str9) -> {
            return (String) function.apply(str9);
        });
        (booleanValue ? selectKey.filter((str10, str11) -> {
            return true;
        }).mapValues(str12 -> {
            return Integer.toString(Integer.parseInt(str12) + 1);
        }) : selectKey).groupByKey(Grouped.with("grouped-stream", Serdes.String(), Serdes.String())).aggregate(initializer, aggregator, Materialized.as("count-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())).toStream().peek((str13, num2) -> {
            System.out.printf("AGGREGATED key=%s value=%s%n", str13, num2);
        }).to(str2, Produced.with(Serdes.String(), Serdes.Integer()));
        Properties properties = new Properties();
        properties.setProperty("application.id", "StreamsNamedRepartitionTest");
        properties.setProperty("statestore.cache.max.bytes", "0");
        properties.setProperty("default.key.serde", Serdes.String().getClass().getName());
        properties.setProperty("default.value.serde", Serdes.String().getClass().getName());
        properties.putAll(loadProps);
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(properties), properties);
        kafkaStreams.setStateListener((state, state2) -> {
            if (state2 == KafkaStreams.State.REBALANCING && state == KafkaStreams.State.RUNNING) {
                if (booleanValue) {
                    System.out.println("UPDATED Topology");
                } else {
                    System.out.println("REBALANCING -> RUNNING");
                }
                System.out.flush();
            }
        });
        kafkaStreams.start();
        Exit.addShutdownHook("streams-shutdown-hook", () -> {
            System.out.println("closing Kafka Streams instance");
            System.out.flush();
            kafkaStreams.close(Duration.ofMillis(5000L));
            System.out.println("NAMED_REPARTITION_TEST Streams Stopped");
            System.out.flush();
        });
    }
}
