package com.github.carlosmenezes.mockafka;

import com.github.carlosmenezes.mockafka.exceptions.EmptyInputException;
import com.github.carlosmenezes.mockafka.exceptions.EmptyOutputSizeException;
import com.github.carlosmenezes.mockafka.exceptions.NoTopologyException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.ProcessorTopologyTestDriver;

/* loaded from: input_file:com/github/carlosmenezes/mockafka/MockafkaBuilder.class */
public class MockafkaBuilder {
    private final Properties properties;
    private final List<String> stateStores;
    private final Map<String, MockafkaInput> inputs;
    private Optional<Consumer<KStreamBuilder>> topology = Optional.empty();

    /* loaded from: input_file:com/github/carlosmenezes/mockafka/MockafkaBuilder$MockafkaInput.class */
    public class MockafkaInput {
        private final Map<byte[], byte[]> input;

        public MockafkaInput(Map<byte[], byte[]> map) {
            this.input = map;
        }
    }

    public MockafkaBuilder(Properties properties, List<String> list, Map<String, MockafkaInput> map) {
        this.properties = properties;
        this.stateStores = list;
        this.inputs = map;
    }

    public MockafkaBuilder config(Properties properties) {
        this.properties.putAll(properties);
        return this;
    }

    public MockafkaBuilder topology(Consumer<KStreamBuilder> consumer) {
        this.topology = Optional.of(consumer);
        return this;
    }

    public MockafkaBuilder stores(List<String> list) {
        this.stateStores.addAll(list);
        return this;
    }

    public <K, V> MockafkaBuilder input(String str, Serde<K> serde, Serde<V> serde2, Map<K, V> map) {
        Serializer serializer = serde.serializer();
        Serializer serializer2 = serde2.serializer();
        this.inputs.put(str, new MockafkaInput((Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
            return serializer.serialize(str, entry.getKey());
        }, entry2 -> {
            return serializer2.serialize(str, entry2.getValue());
        }))));
        return this;
    }

    public <K, V> List output(String str, Serde<K> serde, Serde<V> serde2, int i) throws EmptyOutputSizeException, EmptyInputException, NoTopologyException {
        if (i < 1) {
            throw new EmptyOutputSizeException();
        }
        return (List) withProcessedDriver(processorTopologyTestDriver -> {
            return (List) IntStream.range(0, i).mapToObj(i2 -> {
                Optional ofNullable = Optional.ofNullable(processorTopologyTestDriver.readOutput(str, serde.deserializer(), serde2.deserializer()));
                return ofNullable.isPresent() ? ofNullable.get() : Optional.empty();
            }).collect(Collectors.toList());
        });
    }

    public <K, V> Map<K, V> outputTable(String str, Serde<K> serde, Serde<V> serde2, int i) throws EmptyOutputSizeException, NoTopologyException, EmptyInputException {
        return (Map) output(str, serde, serde2, i).stream().collect(Collectors.toMap(producerRecord -> {
            return producerRecord.key();
        }, producerRecord2 -> {
            return producerRecord2.value();
        }));
    }

    public <K, V> Map<K, V> stateTable(String str) throws EmptyInputException, NoTopologyException {
        return (Map) withProcessedDriver(processorTopologyTestDriver -> {
            KeyValueIterator all = processorTopologyTestDriver.getKeyValueStore(str).all();
            HashMap hashMap = new HashMap();
            all.forEachRemaining(keyValue -> {
                hashMap.put(keyValue.key, keyValue.value);
            });
            all.close();
            return hashMap;
        });
    }

    public <K, V> Map<K, V> windowStateTable(String str, K k, long j, long j2) throws EmptyInputException, NoTopologyException {
        return (Map) withProcessedDriver(processorTopologyTestDriver -> {
            WindowStoreIterator fetch = processorTopologyTestDriver.getStateStore(str).fetch(k, j, j2);
            HashMap hashMap = new HashMap();
            fetch.forEachRemaining(keyValue -> {
                hashMap.put(keyValue.key, keyValue.value);
            });
            fetch.close();
            return hashMap;
        });
    }

    private ProcessorTopologyTestDriver stream() throws NoTopologyException {
        this.properties.putIfAbsent("application.id", String.format("mocked-%s", UUID.randomUUID()));
        this.properties.putIfAbsent("bootstrap.servers", "localhost:9092");
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        this.topology.orElseThrow(() -> {
            return new NoTopologyException();
        }).accept(kStreamBuilder);
        return new ProcessorTopologyTestDriver(new StreamsConfig(this.properties), kStreamBuilder);
    }

    private void produce(ProcessorTopologyTestDriver processorTopologyTestDriver) {
        this.inputs.forEach((str, mockafkaInput) -> {
            mockafkaInput.input.forEach((bArr, bArr2) -> {
                processorTopologyTestDriver.process(str, bArr, bArr2);
            });
        });
    }

    private <T> T withProcessedDriver(Function<ProcessorTopologyTestDriver, T> function) throws EmptyInputException, NoTopologyException {
        if (this.inputs.isEmpty()) {
            throw new EmptyInputException();
        }
        ProcessorTopologyTestDriver stream = stream();
        produce(stream);
        T apply = function.apply(stream);
        stream.close();
        return apply;
    }
}
