package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamTransformTest.class */
public class KStreamTransformTest {
    private static final String TOPIC_NAME = "topic";
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.Integer(), (Serde<?>) Serdes.Integer());

    @Test
    public void testTransform() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        TransformerSupplier transformerSupplier = () -> {
            return new Transformer<Number, Number, KeyValue<Integer, Integer>>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamTransformTest.1
                private int total = 0;

                public void init(ProcessorContext processorContext) {
                    processorContext.schedule(Duration.ofMillis(1L), PunctuationType.WALL_CLOCK_TIME, j -> {
                        processorContext.forward(-1, Integer.valueOf((int) j), To.all().withTimestamp(j));
                    });
                }

                public KeyValue<Integer, Integer> transform(Number number, Number number2) {
                    this.total += number2.intValue();
                    return KeyValue.pair(Integer.valueOf(number.intValue() * 2), Integer.valueOf(this.total));
                }

                public void close() {
                }
            };
        };
        int[] iArr = {1, 10, 100, 1000};
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.stream(TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())).transform(transformerSupplier, new String[0]).process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), new Properties(), Instant.ofEpochMilli(0L));
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(TOPIC_NAME, new IntegerSerializer(), new IntegerSerializer());
                for (int i : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i), Integer.valueOf(i * 10), i / 2);
                }
                topologyTestDriver.advanceWallClockTime(Duration.ofMillis(2L));
                topologyTestDriver.advanceWallClockTime(Duration.ofMillis(1L));
                KeyValueTimestamp[] keyValueTimestampArr = {new KeyValueTimestamp(2, 10, 0L), new KeyValueTimestamp(20, 110, 5L), new KeyValueTimestamp(200, 1110, 50L), new KeyValueTimestamp(2000, 11110, 500L), new KeyValueTimestamp(-1, 2, 2L), new KeyValueTimestamp(-1, 3, 3L)};
                Assert.assertEquals(keyValueTimestampArr.length, mockProcessorSupplier.theCapturedProcessor().processed().size());
                for (int i2 = 0; i2 < keyValueTimestampArr.length; i2++) {
                    Assert.assertEquals(keyValueTimestampArr[i2], mockProcessorSupplier.theCapturedProcessor().processed().get(i2));
                }
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTransformWithNewDriverAndPunctuator() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        TransformerSupplier transformerSupplier = () -> {
            return new Transformer<Number, Number, KeyValue<Integer, Integer>>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamTransformTest.2
                private int total = 0;

                public void init(ProcessorContext processorContext) {
                    processorContext.schedule(Duration.ofMillis(1L), PunctuationType.WALL_CLOCK_TIME, j -> {
                        processorContext.forward(-1, Integer.valueOf((int) j), To.all().withTimestamp(j));
                    });
                }

                public KeyValue<Integer, Integer> transform(Number number, Number number2) {
                    this.total += number2.intValue();
                    return KeyValue.pair(Integer.valueOf(number.intValue() * 2), Integer.valueOf(this.total));
                }

                public void close() {
                }
            };
        };
        int[] iArr = {1, 10, 100, 1000};
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.stream(TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())).transform(transformerSupplier, new String[0]).process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props, Instant.ofEpochMilli(0L));
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(TOPIC_NAME, new IntegerSerializer(), new IntegerSerializer());
                for (int i : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i), Integer.valueOf(i * 10), 0L);
                }
                topologyTestDriver.advanceWallClockTime(Duration.ofMillis(2L));
                topologyTestDriver.advanceWallClockTime(Duration.ofMillis(1L));
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Assert.assertEquals(6L, mockProcessorSupplier.theCapturedProcessor().processed().size());
                KeyValueTimestamp[] keyValueTimestampArr = {new KeyValueTimestamp(2, 10, 0L), new KeyValueTimestamp(20, 110, 0L), new KeyValueTimestamp(200, 1110, 0L), new KeyValueTimestamp(2000, 11110, 0L), new KeyValueTimestamp(-1, 2, 2L), new KeyValueTimestamp(-1, 3, 3L)};
                for (int i2 = 0; i2 < keyValueTimestampArr.length; i2++) {
                    Assert.assertEquals(keyValueTimestampArr[i2], mockProcessorSupplier.theCapturedProcessor().processed().get(i2));
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }
}
