package org.apache.kafka.streams.kstream.internals;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamPeekTest.class */
public class KStreamPeekTest {
    private final String topicName = "topic";
    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());

    @Test
    public void shouldObserveStreamElements() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("topic", Consumed.with(Serdes.Integer(), Serdes.String()));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        stream.peek(collect(arrayList)).foreach(collect(arrayList2));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                ArrayList arrayList3 = new ArrayList();
                for (int i = 0; i < 32; i++) {
                    String str = "V" + i;
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic", Integer.valueOf(i), str));
                    arrayList3.add(new KeyValue(Integer.valueOf(i), str));
                }
                Assert.assertEquals(arrayList3, arrayList);
                Assert.assertEquals(arrayList3, arrayList2);
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldNotAllowNullAction() {
        try {
            new StreamsBuilder().stream("topic", Consumed.with(Serdes.Integer(), Serdes.String())).peek((ForeachAction) null);
            Assert.fail("expected null action to throw NPE");
        } catch (NullPointerException e) {
        }
    }

    private static <K, V> ForeachAction<K, V> collect(List<KeyValue<K, V>> list) {
        return (obj, obj2) -> {
            list.add(new KeyValue(obj, obj2));
        };
    }
}
