package io.floodplain.kotlindsl;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.floodplain.kotlindsl.message.IMessage;
import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.replication.api.ReplicationMessageParser;
import io.floodplain.replication.factory.ReplicationFactory;
import io.floodplain.streams.api.Topic;
import io.floodplain.streams.api.TopologyContext;
import io.floodplain.streams.remotejoin.TopologyConstructor;
import io.floodplain.streams.serializer.ReplicationMessageSerde;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.Triple;
import kotlin.TuplesKt;
import kotlin.TypeCastException;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.collections.MapsKt;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.coroutines.jvm.internal.Boxing;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineStart;
import kotlinx.coroutines.GlobalScope;
import kotlinx.coroutines.Job;
import kotlinx.coroutines.ThreadPoolDispatcherKt;
import kotlinx.coroutines.flow.Flow;
import kotlinx.coroutines.flow.FlowCollector;
import kotlinx.coroutines.flow.FlowKt;
import mu.KLogger;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.state.KeyValueStore;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* compiled from: LocalRuntime.kt */
@Metadata(mv = {1, 1, 16}, bv = {1, 0, 3}, k = 1, d1 = {"��À\u0001\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n\u0002\b\u0003\n\u0002\u0010!\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010%\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\n\n\u0002\u0010\"\n��\n\u0002\u0010\u0012\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u000b\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010$\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\t\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\b\u0005\u0018��2\u00020\u0001BC\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\f\u0010\b\u001a\b\u0012\u0004\u0012\u00020\n0\t\u0012\f\u0010\u000b\u001a\b\u0012\u0004\u0012\u00020\n0\t\u0012\b\u0010\f\u001a\u0004\u0018\u00010\r¢\u0006\u0002\u0010\u000eJ\u0010\u0010\u001f\u001a\u00020 2\u0006\u0010!\u001a\u00020\"H\u0016J\u0006\u0010#\u001a\u00020 J\u000e\u0010\u0010\u001a\b\u0012\u0004\u0012\u00020\u00120\tH\u0016J\u0011\u0010$\u001a\u00020 H\u0096@ø\u0001��¢\u0006\u0002\u0010%J\u000e\u0010&\u001a\b\u0012\u0004\u0012\u00020\u00120\tH\u0016J\u0018\u0010'\u001a\u00020 2\u0006\u0010(\u001a\u00020\u00172\u0006\u0010)\u001a\u00020\u0017H\u0016J\u0010\u0010*\u001a\u00020\u00172\u0006\u0010(\u001a\u00020\u0017H\u0016J\b\u0010+\u001a\u00020 H\u0016J\u000e\u0010,\u001a\b\u0012\u0004\u0012\u00020\u00170-H\u0016J \u0010.\u001a\u00020 2\u0006\u0010(\u001a\u00020\u00172\u0006\u0010)\u001a\u00020/2\u0006\u00100\u001a\u00020/H\u0016J \u0010.\u001a\u00020 2\u0006\u0010(\u001a\u00020\u00172\u0006\u0010)\u001a\u00020\u00172\u0006\u00100\u001a\u000201H\u0016J\u000e\u00102\u001a\b\u0012\u0004\u0012\u00020\u00170-H\u0016J\u0010\u00103\u001a\u0002042\u0006\u0010(\u001a\u00020\u0017H\u0016J\u001c\u00105\u001a\u000e\u0012\u0004\u0012\u00020\u0017\u0012\u0004\u0012\u000201062\u0006\u0010(\u001a\u00020\u0017H\u0016J\"\u00107\u001a\u001c\u0012\u0018\u0012\u0016\u0012\u0004\u0012\u00020:\u0012\u0004\u0012\u00020\u0017\u0012\u0006\u0012\u0004\u0018\u00010/0908H\u0002J<\u0010;\u001a.\u0012\u0004\u0012\u00020:\u0012$\u0012\"\u0012\u001e\u0012\u001c\u0012\u0004\u0012\u00020\u0017\u0012\u0012\u0012\u0010\u0012\u0004\u0012\u00020\u0017\u0012\u0004\u0012\u00020=\u0018\u00010<06080<2\u0006\u0010>\u001a\u00020?H\u0002J\u0010\u0010@\u001a\u00020A2\u0006\u0010(\u001a\u00020\u0017H\u0016J\u000e\u0010B\u001a\b\u0012\u0004\u0012\u00020\u00170-H\u0016J\u000e\u0010C\u001a\b\u0012\u0004\u0012\u00020\n0\tH\u0016J\u001a\u0010D\u001a\u0014\u0012\u0004\u0012\u00020:\u0012\n\u0012\b\u0012\u0004\u0012\u00020E0\t0<H\u0016J\u0018\u0010F\u001a\u00020 2\u0006\u0010(\u001a\u00020\u00172\u0006\u0010G\u001a\u00020\rH\u0016J\u000e\u0010H\u001a\b\u0012\u0004\u0012\u00020\n0\tH\u0016J\u001c\u0010I\u001a\u000e\u0012\u0004\u0012\u00020\u0017\u0012\u0004\u0012\u00020\u00190J2\u0006\u0010K\u001a\u00020\u0017H\u0016J\u000e\u0010L\u001a\b\u0012\u0004\u0012\u00020:0-H\u0002J\b\u0010\u0006\u001a\u00020\u0007H\u0016J\b\u0010\u0004\u001a\u00020\u0005H\u0016J\u001e\u0010M\u001a\b\u0012\u0004\u0012\u0002HN08\"\u0004\b��\u0010N*\b\u0012\u0004\u0012\u0002HN08H\u0002R\u0012\u0010\f\u001a\u0004\u0018\u00010\rX\u0082\u0004¢\u0006\u0004\n\u0002\u0010\u000fR\u0017\u0010\u0010\u001a\b\u0012\u0004\u0012\u00020\u00120\u0011¢\u0006\b\n��\u001a\u0004\b\u0013\u0010\u0014R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R&\u0010\u0015\u001a\u001a\u0012\u0004\u0012\u00020\u0017\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0017\u0012\u0004\u0012\u00020\u00190\u00180\u0016X\u0082\u0004¢\u0006\u0002\n��R&\u0010\u001a\u001a\u001a\u0012\u0004\u0012\u00020\u0017\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0017\u0012\u0004\u0012\u00020\u00190\u001b0\u0016X\u0082\u0004¢\u0006\u0002\n��R\u0016\u0010\u001c\u001a\n \u001e*\u0004\u0018\u00010\u001d0\u001dX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u000b\u001a\b\u0012\u0004\u0012\u00020\n0\tX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\b\u001a\b\u0012\u0004\u0012\u00020\n0\tX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006O"}, d2 = {"Lio/floodplain/kotlindsl/LocalDriverContext;", "Lio/floodplain/kotlindsl/LocalContext;", "driver", "Lorg/apache/kafka/streams/TopologyTestDriver;", "topologyContext", "Lio/floodplain/streams/api/TopologyContext;", "topologyConstructor", "Lio/floodplain/streams/remotejoin/TopologyConstructor;", "sourceConfigs", "", "Lio/floodplain/kotlindsl/Config;", "sinkConfigs", "bufferTime", "", "(Lorg/apache/kafka/streams/TopologyTestDriver;Lio/floodplain/streams/api/TopologyContext;Lio/floodplain/streams/remotejoin/TopologyConstructor;Ljava/util/List;Ljava/util/List;Ljava/lang/Integer;)V", "Ljava/lang/Integer;", "connectJobs", "", "Lkotlinx/coroutines/Job;", "getConnectJobs", "()Ljava/util/List;", "inputTopics", "", "", "Lorg/apache/kafka/streams/TestInputTopic;", "Lio/floodplain/replication/api/ReplicationMessage;", "outputTopics", "Lorg/apache/kafka/streams/TestOutputTopic;", "replicationMessageParser", "Lio/floodplain/replication/api/ReplicationMessageParser;", "kotlin.jvm.PlatformType", "advanceWallClockTime", "", "duration", "Ljava/time/Duration;", "closeSinks", "connectSource", "(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "connectSourceAndSink", "delete", "topic", "key", "deleted", "flushSinks", "getStateStoreNames", "", "input", "", "msg", "Lio/floodplain/kotlindsl/message/IMessage;", "inputs", "isEmpty", "", "output", "Lkotlin/Pair;", "outputFlowSingle", "Lkotlinx/coroutines/flow/Flow;", "Lkotlin/Triple;", "Lio/floodplain/streams/api/Topic;", "outputFlows", "", "", "context", "Lkotlinx/coroutines/CoroutineScope;", "outputSize", "", "outputs", "sinkConfigurations", "sinksByTopic", "Lio/floodplain/kotlindsl/FloodplainSink;", "skip", "number", "sourceConfigurations", "stateStore", "Lorg/apache/kafka/streams/state/KeyValueStore;", "name", "topics", "handleErrors", "T", "floodplain-dsl"})
/* loaded from: input_file:io/floodplain/kotlindsl/LocalDriverContext.class */
public final class LocalDriverContext implements LocalContext {

    @NotNull
    private final List<Job> connectJobs;
    private final Map<String, TestInputTopic<String, ReplicationMessage>> inputTopics;
    private final Map<String, TestOutputTopic<String, ReplicationMessage>> outputTopics;
    private final ReplicationMessageParser replicationMessageParser;
    private final TopologyTestDriver driver;
    private final TopologyContext topologyContext;
    private final TopologyConstructor topologyConstructor;
    private final List<Config> sourceConfigs;
    private final List<Config> sinkConfigs;
    private final Integer bufferTime;

    @NotNull
    public final List<Job> getConnectJobs() {
        return this.connectJobs;
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    @NotNull
    public List<Config> sourceConfigurations() {
        return this.sourceConfigs;
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    @NotNull
    public List<Config> sinkConfigurations() {
        return this.sinkConfigs;
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    @NotNull
    public List<Job> connectJobs() {
        return this.connectJobs;
    }

    @Override // io.floodplain.kotlindsl.InputReceiver
    @NotNull
    public Set<String> inputs() {
        return this.inputTopics.keySet();
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    @NotNull
    public Set<String> outputs() {
        return this.outputTopics.keySet();
    }

    /* JADX WARN: Removed duplicated region for block: B:12:0x0079  */
    /* JADX WARN: Removed duplicated region for block: B:22:0x00c9  */
    /* JADX WARN: Removed duplicated region for block: B:23:0x0107  */
    /* JADX WARN: Removed duplicated region for block: B:8:0x0058  */
    @Override // io.floodplain.kotlindsl.LocalContext
    @org.jetbrains.annotations.Nullable
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public java.lang.Object connectSource(@org.jetbrains.annotations.NotNull kotlin.coroutines.Continuation<? super kotlin.Unit> r7) {
        /*
            Method dump skipped, instructions count: 273
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.floodplain.kotlindsl.LocalDriverContext.connectSource(kotlin.coroutines.Continuation):java.lang.Object");
    }

    private final <T> Flow<T> handleErrors(@NotNull Flow<? extends T> flow) {
        return FlowKt.catch(flow, new LocalDriverContext$handleErrors$1(null));
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    @NotNull
    public List<Job> connectSourceAndSink() {
        KLogger kLogger;
        KLogger kLogger2;
        Job launch = BuildersKt.launch(GlobalScope.INSTANCE, ThreadPoolDispatcherKt.newSingleThreadContext("TopologySource"), CoroutineStart.UNDISPATCHED, new LocalDriverContext$connectSourceAndSink$outputJob$1(this, null));
        launch.start();
        kLogger = LocalRuntimeKt.logger;
        kLogger.info("output job connected");
        Job launch$default = BuildersKt.launch$default(GlobalScope.INSTANCE, (CoroutineContext) null, (CoroutineStart) null, new LocalDriverContext$connectSourceAndSink$inputJob$1(this, null), 3, (Object) null);
        kLogger2 = LocalRuntimeKt.logger;
        kLogger2.info("input job connected");
        return CollectionsKt.listOf(new Job[]{launch$default, launch});
    }

    private final Set<Topic> topics() {
        List<Config> sinkConfigurations = sinkConfigurations();
        ArrayList arrayList = new ArrayList();
        Iterator<T> it = sinkConfigurations.iterator();
        while (it.hasNext()) {
            CollectionsKt.addAll(arrayList, ((Config) it.next()).materializeConnectorConfig(this.topologyContext));
        }
        ArrayList arrayList2 = arrayList;
        ArrayList arrayList3 = new ArrayList();
        Iterator it2 = arrayList2.iterator();
        while (it2.hasNext()) {
            CollectionsKt.addAll(arrayList3, ((MaterializedSink) it2.next()).getTopics());
        }
        return CollectionsKt.toSet(arrayList3);
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    public void flushSinks() {
        List<Config> sinkConfigurations = sinkConfigurations();
        ArrayList arrayList = new ArrayList();
        Iterator<T> it = sinkConfigurations.iterator();
        while (it.hasNext()) {
            CollectionsKt.addAll(arrayList, ((Config) it.next()).sinkElements().values());
        }
        ArrayList arrayList2 = arrayList;
        ArrayList arrayList3 = new ArrayList();
        Iterator it2 = arrayList2.iterator();
        while (it2.hasNext()) {
            CollectionsKt.addAll(arrayList3, (List) it2.next());
        }
        Iterator it3 = arrayList3.iterator();
        while (it3.hasNext()) {
            ((FloodplainSink) it3.next()).flush();
        }
    }

    public final void closeSinks() {
        List<Config> sinkConfigurations = sinkConfigurations();
        ArrayList arrayList = new ArrayList();
        Iterator<T> it = sinkConfigurations.iterator();
        while (it.hasNext()) {
            CollectionsKt.addAll(arrayList, ((Config) it.next()).sinkElements().values());
        }
        ArrayList arrayList2 = arrayList;
        ArrayList arrayList3 = new ArrayList();
        Iterator it2 = arrayList2.iterator();
        while (it2.hasNext()) {
            CollectionsKt.addAll(arrayList3, (List) it2.next());
        }
        Iterator it3 = arrayList3.iterator();
        while (it3.hasNext()) {
            ((FloodplainSink) it3.next()).close();
        }
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    @NotNull
    public Map<Topic, List<FloodplainSink>> sinksByTopic() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        List<Config> sinkConfigurations = sinkConfigurations();
        ArrayList<Map.Entry> arrayList = new ArrayList();
        for (Config config : sinkConfigurations) {
            config.instantiateSinkElements(this.topologyContext);
            CollectionsKt.addAll(arrayList, config.sinkElements().entrySet());
        }
        for (Map.Entry entry : arrayList) {
            Object computeIfAbsent = linkedHashMap.computeIfAbsent(entry.getKey(), new Function<Topic, List<FloodplainSink>>() { // from class: io.floodplain.kotlindsl.LocalDriverContext$sinksByTopic$2$list$1
                @Override // java.util.function.Function
                @NotNull
                public final List<FloodplainSink> apply(@NotNull Topic topic) {
                    Intrinsics.checkParameterIsNotNull(topic, "it");
                    return new ArrayList();
                }
            });
            Intrinsics.checkExpressionValueIsNotNull(computeIfAbsent, "result.computeIfAbsent(e….key) { mutableListOf() }");
            ((List) computeIfAbsent).addAll((Collection) entry.getValue());
        }
        return linkedHashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Map<Topic, Flow<Pair<String, Map<String, Object>>>> outputFlows(CoroutineScope coroutineScope) {
        Set<Topic> set = topics();
        final JsonDeserializer jsonDeserializer = new JsonDeserializer();
        final ObjectMapper objectMapper = new ObjectMapper();
        final Flow<Triple<Topic, String, byte[]>> outputFlowSingle = outputFlowSingle();
        final Flow handleErrors = handleErrors(FlowKt.asFlow(FlowKt.broadcastIn$default(new Flow<Triple<? extends Topic, ? extends String, ? extends Map<String, ? extends Object>>>() { // from class: io.floodplain.kotlindsl.LocalDriverContext$outputFlows$$inlined$map$1
            @Nullable
            public Object collect(@NotNull final FlowCollector flowCollector, @NotNull Continuation continuation) {
                Object collect = outputFlowSingle.collect(new FlowCollector<Triple<? extends Topic, ? extends String, ? extends byte[]>>() { // from class: io.floodplain.kotlindsl.LocalDriverContext$outputFlows$$inlined$map$1.2
                    @Nullable
                    public Object emit(Object obj, @NotNull Continuation continuation2) {
                        TopologyContext topologyContext;
                        ObjectNode objectNode;
                        FlowCollector flowCollector2 = flowCollector;
                        Triple triple = (Triple) obj;
                        Topic topic = (Topic) triple.component1();
                        String str = (String) triple.component2();
                        byte[] bArr = (byte[]) triple.component3();
                        if (bArr == null) {
                            objectNode = null;
                        } else {
                            JsonDeserializer jsonDeserializer2 = jsonDeserializer;
                            topologyContext = this.topologyContext;
                            ObjectNode deserialize = jsonDeserializer2.deserialize(topic.qualifiedString(topologyContext), bArr);
                            if (deserialize == null) {
                                throw new TypeCastException("null cannot be cast to non-null type com.fasterxml.jackson.databind.node.ObjectNode");
                            }
                            objectNode = deserialize;
                        }
                        Object emit = flowCollector2.emit(new Triple(topic, str, bArr == null ? null : (Map) objectMapper.convertValue(objectNode, new TypeReference<Map<String, ? extends Object>>() { // from class: io.floodplain.kotlindsl.LocalDriverContext$outputFlows$$inlined$map$1$2$lambda$1
                        })), continuation2);
                        return emit == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? emit : Unit.INSTANCE;
                    }
                }, continuation);
                return collect == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? collect : Unit.INSTANCE;
            }
        }, coroutineScope, (CoroutineStart) null, 2, (Object) null)));
        Set<Topic> set2 = set;
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(set2, 10));
        for (final Topic topic : set2) {
            final Flow<Triple<? extends Topic, ? extends String, ? extends Map<String, ? extends Object>>> flow = new Flow<Triple<? extends Topic, ? extends String, ? extends Map<String, ? extends Object>>>() { // from class: io.floodplain.kotlindsl.LocalDriverContext$$special$$inlined$filter$1
                @Nullable
                public Object collect(@NotNull final FlowCollector flowCollector, @NotNull Continuation continuation) {
                    Object collect = handleErrors.collect(new FlowCollector<Triple<? extends Topic, ? extends String, ? extends Map<String, ? extends Object>>>() { // from class: io.floodplain.kotlindsl.LocalDriverContext$$special$$inlined$filter$1.2
                        @Nullable
                        public Object emit(Object obj, @NotNull Continuation continuation2) {
                            FlowCollector flowCollector2 = flowCollector;
                            if (!Boxing.boxBoolean(Intrinsics.areEqual((Topic) ((Triple) obj).component1(), topic)).booleanValue()) {
                                return Unit.INSTANCE;
                            }
                            Object emit = flowCollector2.emit(obj, continuation2);
                            return emit == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? emit : Unit.INSTANCE;
                        }
                    }, continuation);
                    return collect == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? collect : Unit.INSTANCE;
                }
            };
            arrayList.add(TuplesKt.to(topic, new Flow<Pair<? extends String, ? extends Map<String, ? extends Object>>>() { // from class: io.floodplain.kotlindsl.LocalDriverContext$$special$$inlined$map$1
                @Nullable
                public Object collect(@NotNull final FlowCollector flowCollector, @NotNull Continuation continuation) {
                    Object collect = flow.collect(new FlowCollector<Triple<? extends Topic, ? extends String, ? extends Map<String, ? extends Object>>>() { // from class: io.floodplain.kotlindsl.LocalDriverContext$$special$$inlined$map$1.2
                        @Nullable
                        public Object emit(Object obj, @NotNull Continuation continuation2) {
                            Triple triple = (Triple) obj;
                            Object emit = flowCollector.emit(TuplesKt.to((String) triple.component2(), (Map) triple.component3()), continuation2);
                            return emit == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? emit : Unit.INSTANCE;
                        }
                    }, continuation);
                    return collect == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? collect : Unit.INSTANCE;
                }
            }));
        }
        return MapsKt.toMap(arrayList);
    }

    private final Flow<Triple<Topic, String, byte[]>> outputFlowSingle() {
        return FlowKt.callbackFlow(new LocalDriverContext$outputFlowSingle$1(this, null));
    }

    @Override // io.floodplain.kotlindsl.InputReceiver
    public void input(@NotNull String str, @NotNull String str2, @NotNull IMessage iMessage) {
        KLogger kLogger;
        Intrinsics.checkParameterIsNotNull(str, "topic");
        Intrinsics.checkParameterIsNotNull(str2, "key");
        Intrinsics.checkParameterIsNotNull(iMessage, "msg");
        final String str3 = this.topologyContext.topicName(str);
        if (!inputs().contains(str3)) {
            kLogger = LocalRuntimeKt.logger;
            kLogger.debug("Missing topic: " + str + " available topics: " + inputs());
        }
        TestInputTopic<String, ReplicationMessage> computeIfAbsent = this.inputTopics.computeIfAbsent(str3, new Function<String, TestInputTopic<String, ReplicationMessage>>() { // from class: io.floodplain.kotlindsl.LocalDriverContext$input$inputTopic$1
            @Override // java.util.function.Function
            public final TestInputTopic<String, ReplicationMessage> apply(@NotNull String str4) {
                TopologyTestDriver topologyTestDriver;
                Intrinsics.checkParameterIsNotNull(str4, "it");
                topologyTestDriver = LocalDriverContext.this.driver;
                return topologyTestDriver.createInputTopic(str3, Serdes.String().serializer(), new ReplicationMessageSerde().serializer());
            }
        });
        Intrinsics.checkExpressionValueIsNotNull(computeIfAbsent, "inputTopics.computeIfAbs…)\n            )\n        }");
        computeIfAbsent.pipeInput(str2, ReplicationFactory.standardMessage(iMessage.toImmutable()));
    }

    @Override // io.floodplain.kotlindsl.InputReceiver
    public void input(@NotNull String str, @NotNull byte[] bArr, @NotNull byte[] bArr2) {
        KLogger kLogger;
        Intrinsics.checkParameterIsNotNull(str, "topic");
        Intrinsics.checkParameterIsNotNull(bArr, "key");
        Intrinsics.checkParameterIsNotNull(bArr2, "msg");
        String str2 = this.topologyContext.topicName(str);
        if (!inputs().contains(str2)) {
            kLogger = LocalRuntimeKt.logger;
            kLogger.debug("Missing topic: " + str + " available topics: " + inputs());
        }
        try {
            this.driver.pipeRawRecord(str2, Instant.now().toEpochMilli(), bArr, bArr2);
        } catch (Throwable th) {
        }
    }

    @Override // io.floodplain.kotlindsl.InputReceiver
    public void delete(@NotNull String str, @NotNull String str2) {
        Intrinsics.checkParameterIsNotNull(str, "topic");
        Intrinsics.checkParameterIsNotNull(str2, "key");
        final String str3 = this.topologyContext.topicName(str);
        TestInputTopic<String, ReplicationMessage> computeIfAbsent = this.inputTopics.computeIfAbsent(str3, new Function<String, TestInputTopic<String, ReplicationMessage>>() { // from class: io.floodplain.kotlindsl.LocalDriverContext$delete$inputTopic$1
            @Override // java.util.function.Function
            public final TestInputTopic<String, ReplicationMessage> apply(@NotNull String str4) {
                TopologyTestDriver topologyTestDriver;
                Intrinsics.checkParameterIsNotNull(str4, "it");
                topologyTestDriver = LocalDriverContext.this.driver;
                return topologyTestDriver.createInputTopic(str3, Serdes.String().serializer(), new ReplicationMessageSerde().serializer());
            }
        });
        Intrinsics.checkExpressionValueIsNotNull(computeIfAbsent, "inputTopics.computeIfAbs…)\n            )\n        }");
        computeIfAbsent.pipeInput(str2, (Object) null);
    }

    /* JADX WARN: Code restructure failed: missing block: B:4:0x0050, code lost:
    
        if (r0 != null) goto L8;
     */
    @Override // io.floodplain.kotlindsl.LocalContext
    @org.jetbrains.annotations.NotNull
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public kotlin.Pair<java.lang.String, io.floodplain.kotlindsl.message.IMessage> output(@org.jetbrains.annotations.NotNull java.lang.String r8) {
        /*
            r7 = this;
            r0 = r8
            java.lang.String r1 = "topic"
            kotlin.jvm.internal.Intrinsics.checkParameterIsNotNull(r0, r1)
            r0 = r7
            io.floodplain.streams.api.TopologyContext r0 = r0.topologyContext
            r1 = r8
            java.lang.String r0 = r0.topicName(r1)
            r9 = r0
            r0 = r7
            java.util.Map<java.lang.String, org.apache.kafka.streams.TestOutputTopic<java.lang.String, io.floodplain.replication.api.ReplicationMessage>> r0 = r0.outputTopics
            r1 = r9
            io.floodplain.kotlindsl.LocalDriverContext$output$outputTopic$1 r2 = new io.floodplain.kotlindsl.LocalDriverContext$output$outputTopic$1
            r3 = r2
            r4 = r7
            r5 = r9
            r3.<init>()
            java.util.function.Function r2 = (java.util.function.Function) r2
            java.lang.Object r0 = r0.computeIfAbsent(r1, r2)
            r1 = r0
            java.lang.String r2 = "outputTopics.computeIfAb…)\n            )\n        }"
            kotlin.jvm.internal.Intrinsics.checkExpressionValueIsNotNull(r1, r2)
            org.apache.kafka.streams.TestOutputTopic r0 = (org.apache.kafka.streams.TestOutputTopic) r0
            r10 = r0
            r0 = r10
            org.apache.kafka.streams.KeyValue r0 = r0.readKeyValue()
            r1 = r0
            java.lang.String r2 = "outputTopic.readKeyValue()"
            kotlin.jvm.internal.Intrinsics.checkExpressionValueIsNotNull(r1, r2)
            r11 = r0
            r0 = r11
            java.lang.Object r0 = r0.value
            io.floodplain.replication.api.ReplicationMessage r0 = (io.floodplain.replication.api.ReplicationMessage) r0
            r1 = r0
            if (r1 == 0) goto L56
            io.floodplain.replication.api.ReplicationMessage$Operation r0 = r0.operation()
            r1 = r0
            if (r1 == 0) goto L56
            goto L5a
        L56:
            io.floodplain.replication.api.ReplicationMessage$Operation r0 = io.floodplain.replication.api.ReplicationMessage.Operation.DELETE
        L5a:
            r12 = r0
            r0 = r12
            io.floodplain.replication.api.ReplicationMessage$Operation r1 = io.floodplain.replication.api.ReplicationMessage.Operation.DELETE
            if (r0 != r1) goto L9a
            mu.KLogger r0 = io.floodplain.kotlindsl.LocalRuntimeKt.access$getLogger$p()
            java.lang.StringBuilder r1 = new java.lang.StringBuilder
            r2 = r1
            r2.<init>()
            java.lang.String r2 = "delete detected! isnull? "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r11
            java.lang.Object r2 = r2.value
            io.floodplain.replication.api.ReplicationMessage r2 = (io.floodplain.replication.api.ReplicationMessage) r2
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r1 = r1.toString()
            r0.info(r1)
            mu.KLogger r0 = io.floodplain.kotlindsl.LocalRuntimeKt.access$getLogger$p()
            java.lang.String r1 = "retrying..."
            r0.info(r1)
            r0 = r7
            r1 = r8
            kotlin.Pair r0 = r0.output(r1)
            goto Lc4
        L9a:
            kotlin.Pair r0 = new kotlin.Pair
            r1 = r0
            r2 = r11
            java.lang.Object r2 = r2.key
            r3 = r11
            java.lang.Object r3 = r3.value
            r4 = r3
            if (r4 != 0) goto Laf
            kotlin.jvm.internal.Intrinsics.throwNpe()
        Laf:
            io.floodplain.replication.api.ReplicationMessage r3 = (io.floodplain.replication.api.ReplicationMessage) r3
            io.floodplain.immutable.api.ImmutableMessage r3 = r3.message()
            r4 = r3
            java.lang.String r5 = "keyVal.value!!.message()"
            kotlin.jvm.internal.Intrinsics.checkExpressionValueIsNotNull(r4, r5)
            io.floodplain.kotlindsl.message.IMessage r3 = io.floodplain.kotlindsl.message.IMessageKt.fromImmutable(r3)
            r1.<init>(r2, r3)
        Lc4:
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: io.floodplain.kotlindsl.LocalDriverContext.output(java.lang.String):kotlin.Pair");
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    public void skip(@NotNull String str, int i) {
        Intrinsics.checkParameterIsNotNull(str, "topic");
        for (int i2 = 0; i2 < i; i2++) {
            output(str);
        }
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    public long outputSize(@NotNull String str) {
        Intrinsics.checkParameterIsNotNull(str, "topic");
        final String str2 = this.topologyContext.topicName(str);
        TestOutputTopic<String, ReplicationMessage> computeIfAbsent = this.outputTopics.computeIfAbsent(str2, new Function<String, TestOutputTopic<String, ReplicationMessage>>() { // from class: io.floodplain.kotlindsl.LocalDriverContext$outputSize$outputTopic$1
            @Override // java.util.function.Function
            public final TestOutputTopic<String, ReplicationMessage> apply(@NotNull String str3) {
                TopologyTestDriver topologyTestDriver;
                Intrinsics.checkParameterIsNotNull(str3, "it");
                topologyTestDriver = LocalDriverContext.this.driver;
                return topologyTestDriver.createOutputTopic(str2, Serdes.String().deserializer(), new ReplicationMessageSerde().deserializer());
            }
        });
        Intrinsics.checkExpressionValueIsNotNull(computeIfAbsent, "outputTopics.computeIfAb…)\n            )\n        }");
        return computeIfAbsent.getQueueSize();
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    @NotNull
    public String deleted(@NotNull String str) {
        KLogger kLogger;
        KLogger kLogger2;
        KLogger kLogger3;
        Intrinsics.checkParameterIsNotNull(str, "topic");
        final String str2 = this.topologyContext.topicName(str);
        TestOutputTopic<String, ReplicationMessage> computeIfAbsent = this.outputTopics.computeIfAbsent(str2, new Function<String, TestOutputTopic<String, ReplicationMessage>>() { // from class: io.floodplain.kotlindsl.LocalDriverContext$deleted$outputTopic$1
            @Override // java.util.function.Function
            public final TestOutputTopic<String, ReplicationMessage> apply(@NotNull String str3) {
                TopologyTestDriver topologyTestDriver;
                Intrinsics.checkParameterIsNotNull(str3, "it");
                topologyTestDriver = LocalDriverContext.this.driver;
                return topologyTestDriver.createOutputTopic(str2, Serdes.String().deserializer(), new ReplicationMessageSerde().deserializer());
            }
        });
        Intrinsics.checkExpressionValueIsNotNull(computeIfAbsent, "outputTopics.computeIfAb…)\n            )\n        }");
        final TestOutputTopic<String, ReplicationMessage> testOutputTopic = computeIfAbsent;
        kLogger = LocalRuntimeKt.logger;
        kLogger.info("Looking for a tombstone message for topic " + str2);
        final KeyValue readKeyValue = testOutputTopic.readKeyValue();
        kLogger2 = LocalRuntimeKt.logger;
        kLogger2.info(new Function0<String>() { // from class: io.floodplain.kotlindsl.LocalDriverContext$deleted$1
            @NotNull
            public final String invoke() {
                StringBuilder append = new StringBuilder().append("Found key ").append((String) readKeyValue.key).append(" operation: ");
                ReplicationMessage replicationMessage = (ReplicationMessage) readKeyValue.value;
                return append.append(replicationMessage != null ? replicationMessage.operation() : null).toString();
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(0);
            }
        });
        if (readKeyValue.value == null) {
            Object obj = readKeyValue.key;
            Intrinsics.checkExpressionValueIsNotNull(obj, "keyVal.key");
            return (String) obj;
        }
        if (((ReplicationMessage) readKeyValue.value).operation() == ReplicationMessage.Operation.DELETE) {
            return deleted(str);
        }
        kLogger3 = LocalRuntimeKt.logger;
        kLogger3.error(new Function0<String>() { // from class: io.floodplain.kotlindsl.LocalDriverContext$deleted$2
            @NotNull
            public final String invoke() {
                ReplicationMessageParser replicationMessageParser;
                StringBuilder append = new StringBuilder().append("Unexpected content: ");
                replicationMessageParser = LocalDriverContext.this.replicationMessageParser;
                return append.append(replicationMessageParser.describe((ReplicationMessage) readKeyValue.value)).append(" remaining queue: ").append(testOutputTopic.getQueueSize()).toString();
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(0);
            }
        });
        throw new RuntimeException("Expected delete message for key: " + ((String) readKeyValue.key) + ", but got a value: " + ((ReplicationMessage) readKeyValue.value));
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    public boolean isEmpty(@NotNull String str) {
        Intrinsics.checkParameterIsNotNull(str, "topic");
        final String str2 = this.topologyContext.topicName(str);
        TestOutputTopic<String, ReplicationMessage> computeIfAbsent = this.outputTopics.computeIfAbsent(str2, new Function<String, TestOutputTopic<String, ReplicationMessage>>() { // from class: io.floodplain.kotlindsl.LocalDriverContext$isEmpty$outputTopic$1
            @Override // java.util.function.Function
            public final TestOutputTopic<String, ReplicationMessage> apply(@NotNull String str3) {
                TopologyTestDriver topologyTestDriver;
                Intrinsics.checkParameterIsNotNull(str3, "it");
                topologyTestDriver = LocalDriverContext.this.driver;
                return topologyTestDriver.createOutputTopic(str2, Serdes.String().deserializer(), new ReplicationMessageSerde().deserializer());
            }
        });
        Intrinsics.checkExpressionValueIsNotNull(computeIfAbsent, "outputTopics.computeIfAb…)\n            )\n        }");
        return computeIfAbsent.isEmpty();
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    @NotNull
    public KeyValueStore<String, ReplicationMessage> stateStore(@NotNull String str) {
        KLogger kLogger;
        Intrinsics.checkParameterIsNotNull(str, "name");
        KeyValueStore<String, ReplicationMessage> keyValueStore = this.driver.getKeyValueStore(str);
        if (keyValueStore != null) {
            return keyValueStore;
        }
        Map allStateStores = this.driver.getAllStateStores();
        Intrinsics.checkExpressionValueIsNotNull(allStateStores, "driver.allStateStores");
        ArrayList arrayList = new ArrayList(allStateStores.size());
        Iterator it = allStateStores.entrySet().iterator();
        while (it.hasNext()) {
            String str2 = (String) ((Map.Entry) it.next()).getKey();
            if (str2 == null) {
                throw new TypeCastException("null cannot be cast to non-null type kotlin.String");
            }
            arrayList.add(str2);
        }
        List list = CollectionsKt.toList(arrayList);
        kLogger = LocalRuntimeKt.logger;
        kLogger.error("Can't find state store. Available stores: " + list);
        throw new IllegalStateException("Missing state store: " + str);
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    @NotNull
    public Set<String> getStateStoreNames() {
        return this.driver.getAllStateStores().keySet();
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    @NotNull
    public TopologyContext topologyContext() {
        return this.topologyContext;
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    @NotNull
    public TopologyConstructor topologyConstructor() {
        return this.topologyConstructor;
    }

    @Override // io.floodplain.kotlindsl.LocalContext
    public void advanceWallClockTime(@NotNull Duration duration) {
        Intrinsics.checkParameterIsNotNull(duration, "duration");
        this.driver.advanceWallClockTime(duration);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public LocalDriverContext(@NotNull TopologyTestDriver topologyTestDriver, @NotNull TopologyContext topologyContext, @NotNull TopologyConstructor topologyConstructor, @NotNull List<? extends Config> list, @NotNull List<? extends Config> list2, @Nullable Integer num) {
        Intrinsics.checkParameterIsNotNull(topologyTestDriver, "driver");
        Intrinsics.checkParameterIsNotNull(topologyContext, "topologyContext");
        Intrinsics.checkParameterIsNotNull(topologyConstructor, "topologyConstructor");
        Intrinsics.checkParameterIsNotNull(list, "sourceConfigs");
        Intrinsics.checkParameterIsNotNull(list2, "sinkConfigs");
        this.driver = topologyTestDriver;
        this.topologyContext = topologyContext;
        this.topologyConstructor = topologyConstructor;
        this.sourceConfigs = list;
        this.sinkConfigs = list2;
        this.bufferTime = num;
        this.connectJobs = new ArrayList();
        this.inputTopics = new LinkedHashMap();
        this.outputTopics = new LinkedHashMap();
        this.replicationMessageParser = ReplicationFactory.getInstance();
    }
}
