package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.state.Stores;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.class */
public class SuppressTopologyTest {
    private static final Serde<String> STRING_SERDE = Serdes.String();
    private static final String NAMED_FINAL_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n      --> counts-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: counts-repartition-filter (stores: [])\n      --> counts-repartition-sink\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Sink: counts-repartition-sink (topic: counts-repartition)\n      <-- counts-repartition-filter\n\n  Sub-topology: 1\n    Source: counts-repartition-source (topics: [counts-repartition])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [counts])\n      --> myname\n      <-- counts-repartition-source\n    Processor: myname (stores: [myname-store])\n      --> KTABLE-TOSTREAM-0000000006\n      <-- KSTREAM-AGGREGATE-0000000002\n    Processor: KTABLE-TOSTREAM-0000000006 (stores: [])\n      --> KSTREAM-MAP-0000000007\n      <-- myname\n    Processor: KSTREAM-MAP-0000000007 (stores: [])\n      --> KSTREAM-SINK-0000000008\n      <-- KTABLE-TOSTREAM-0000000006\n    Sink: KSTREAM-SINK-0000000008 (topic: output-suppressed)\n      <-- KSTREAM-MAP-0000000007\n\n";
    private static final String ANONYMOUS_FINAL_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n      --> counts-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: counts-repartition-filter (stores: [])\n      --> counts-repartition-sink\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Sink: counts-repartition-sink (topic: counts-repartition)\n      <-- counts-repartition-filter\n\n  Sub-topology: 1\n    Source: counts-repartition-source (topics: [counts-repartition])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [counts])\n      --> KTABLE-SUPPRESS-0000000006\n      <-- counts-repartition-source\n    Processor: KTABLE-SUPPRESS-0000000006 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000007])\n      --> KTABLE-TOSTREAM-0000000008\n      <-- KSTREAM-AGGREGATE-0000000002\n    Processor: KTABLE-TOSTREAM-0000000008 (stores: [])\n      --> KSTREAM-MAP-0000000009\n      <-- KTABLE-SUPPRESS-0000000006\n    Processor: KSTREAM-MAP-0000000009 (stores: [])\n      --> KSTREAM-SINK-0000000010\n      <-- KTABLE-TOSTREAM-0000000008\n    Sink: KSTREAM-SINK-0000000010 (topic: output-suppressed)\n      <-- KSTREAM-MAP-0000000009\n\n";
    private static final String NAMED_INTERMEDIATE_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> asdf\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: asdf (stores: [asdf-store])\n      --> KTABLE-TOSTREAM-0000000003\n      <-- KSTREAM-AGGREGATE-0000000002\n    Processor: KTABLE-TOSTREAM-0000000003 (stores: [])\n      --> KSTREAM-SINK-0000000004\n      <-- asdf\n    Sink: KSTREAM-SINK-0000000004 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000003\n\n";
    private static final String ANONYMOUS_INTERMEDIATE_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> KTABLE-SUPPRESS-0000000003\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KTABLE-SUPPRESS-0000000003 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000004])\n      --> KTABLE-TOSTREAM-0000000005\n      <-- KSTREAM-AGGREGATE-0000000002\n    Processor: KTABLE-TOSTREAM-0000000005 (stores: [])\n      --> KSTREAM-SINK-0000000006\n      <-- KTABLE-SUPPRESS-0000000003\n    Sink: KSTREAM-SINK-0000000006 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000005\n\n";

    @Test
    public void shouldUseNumberingForAnonymousFinalSuppressionNode() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupBy((str, str2) -> {
            return str;
        }, Grouped.with(STRING_SERDE, STRING_SERDE)).windowedBy(SessionWindows.with(Duration.ofMillis(5L)).grace(Duration.ofMillis(5L))).count(Materialized.as("counts").withCachingDisabled()).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())).toStream().map((windowed, l) -> {
            return new KeyValue(windowed.toString(), l);
        }).to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
        MatcherAssert.assertThat(streamsBuilder.build().describe().toString(), Is.is(ANONYMOUS_FINAL_TOPOLOGY));
    }

    @Test
    public void shouldApplyNameToFinalSuppressionNode() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupBy((str, str2) -> {
            return str;
        }, Grouped.with(STRING_SERDE, STRING_SERDE)).windowedBy(SessionWindows.with(Duration.ofMillis(5L)).grace(Duration.ofMillis(5L))).count(Materialized.as("counts").withCachingDisabled()).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()).withName("myname")).toStream().map((windowed, l) -> {
            return new KeyValue(windowed.toString(), l);
        }).to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
        MatcherAssert.assertThat(streamsBuilder.build().describe().toString(), Is.is(NAMED_FINAL_TOPOLOGY));
    }

    @Test
    public void shouldUseNumberingForAnonymousSuppressionNode() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupByKey().count().suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(1L), Suppressed.BufferConfig.unbounded())).toStream().to("output", Produced.with(STRING_SERDE, Serdes.Long()));
        MatcherAssert.assertThat(streamsBuilder.build().describe().toString(), Is.is(ANONYMOUS_INTERMEDIATE_TOPOLOGY));
    }

    @Test
    public void shouldApplyNameToSuppressionNode() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupByKey().count().suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(1L), Suppressed.BufferConfig.unbounded()).withName("asdf")).toStream().to("output", Produced.with(STRING_SERDE, Serdes.Long()));
        MatcherAssert.assertThat(streamsBuilder.build().describe().toString(), Is.is(NAMED_INTERMEDIATE_TOPOLOGY));
    }

    @Test
    public void shouldThrowOnSuppressForMaterializedVersionedTable() {
        KTable table = new StreamsBuilder().table("input", Materialized.as(Stores.persistentVersionedKeyValueStore("store", Duration.ZERO)));
        Assertions.assertThrows(TopologyException.class, () -> {
            table.suppress(Suppressed.untilTimeLimit(Duration.ZERO, Suppressed.BufferConfig.unbounded()));
        });
    }

    @Test
    public void shouldThrowOnSuppressForNonMaterializedVersionedTable() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("input", Materialized.as(Stores.persistentVersionedKeyValueStore("store", Duration.ZERO))).filter((obj, obj2) -> {
            return true;
        }).suppress(Suppressed.untilTimeLimit(Duration.ZERO, Suppressed.BufferConfig.unbounded()));
        streamsBuilder.getClass();
        Assertions.assertThrows(TopologyException.class, streamsBuilder::build);
    }
}
