package io.floodplain.kotlindsl;

import io.floodplain.reactive.topology.ReactivePipe;
import io.floodplain.reactive.topology.ReactivePipeParser;
import io.floodplain.streams.api.CoreOperators;
import io.floodplain.streams.api.Topic;
import io.floodplain.streams.api.TopologyContext;
import io.floodplain.streams.base.RocksDBConfigurationSetter;
import io.floodplain.streams.base.StreamOperators;
import io.floodplain.streams.remotejoin.ReplicationTopologyParser;
import io.floodplain.streams.remotejoin.TopologyConstructor;
import java.io.File;
import java.io.InputStream;
import java.lang.Thread;
import java.net.URI;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Stack;
import java.util.UUID;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.Triple;
import kotlin.TuplesKt;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.collections.MapsKt;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.io.FilesKt;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.functions.Function3;
import kotlin.jvm.internal.Intrinsics;
import kotlin.text.StringsKt;
import kotlinx.coroutines.BuildersKt;
import mu.KLogger;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* compiled from: Stream.kt */
@Metadata(mv = {1, 4, 2}, bv = {1, 0, 3}, k = 1, d1 = {"��Ò\u0001\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u000e\n\u0002\b\u0005\n\u0002\u0010!\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0007\n\u0002\u0010\u0002\n\u0002\b\u0006\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010$\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0010%\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\b\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000b\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\n\n\u0002\u0010\u0011\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\u0018��2\u00020\u0001B\u0015\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005¢\u0006\u0002\u0010\u0006J\u000e\u0010\u001f\u001a\u00020 2\u0006\u0010!\u001a\u00020\u000fJ\u000e\u0010\"\u001a\u00020\u00142\u0006\u0010!\u001a\u00020\u0014J\u0010\u0010#\u001a\u00020 2\u0006\u0010$\u001a\u00020\u0018H\u0016J\u000e\u0010%\u001a\u00020 2\u0006\u0010!\u001a\u00020\u0016J\u0010\u0010&\u001a\u00020'2\u0006\u0010(\u001a\u00020)H\u0002J\u001c\u0010*\u001a\u00020+2\u0012\u0010,\u001a\u000e\u0012\u0004\u0012\u00020\b\u0012\u0004\u0012\u00020.0-H\u0002J\u001c\u0010/\u001a\u00020 2\u0012\u00100\u001a\u000e\u0012\u0004\u0012\u00020\b\u0012\u0004\u0012\u00020\b01H\u0002J\u000e\u00102\u001a\u00020\b2\u0006\u00103\u001a\u00020\bJ\u001c\u00104\u001a\u00020 2\u0012\u00105\u001a\u000e\u0012\u0004\u0012\u00020\b\u0012\u0004\u0012\u00020\b0-H\u0002J>\u00106\u001a8\u0012\u0004\u0012\u000208\u0012\u0016\u0012\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\b\u0012\u0004\u0012\u00020\b0:09\u0012\u0016\u0012\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\b\u0012\u0004\u0012\u00020\b0:0907H\u0002JO\u0010;\u001a\u00020 2\n\b\u0002\u0010<\u001a\u0004\u0018\u00010\b2\n\b\u0002\u0010=\u001a\u0004\u0018\u00010>2'\u0010?\u001a#\b\u0001\u0012\u0004\u0012\u00020A\u0012\n\u0012\b\u0012\u0004\u0012\u00020 0B\u0012\u0006\u0012\u0004\u0018\u00010.0@¢\u0006\u0002\bCø\u0001��¢\u0006\u0002\u0010DJ\"\u0010E\u001a\u00020F2\b\u0010G\u001a\u0004\u0018\u00010H2\u0006\u00105\u001a\u00020I2\b\b\u0002\u0010J\u001a\u00020KJu\u0010E\u001a\u00020F2\b\u0010G\u001a\u0004\u0018\u00010H2\u0006\u0010L\u001a\u00020\b2\b\b\u0002\u0010J\u001a\u00020K2\u0016\b\u0002\u0010M\u001a\u0010\u0012\u0004\u0012\u00020\b\u0012\u0004\u0012\u00020\b\u0018\u00010-21\b\u0002\u0010N\u001a+\b\u0001\u0012\u0004\u0012\u00020��\u0012\u0004\u0012\u00020F\u0012\n\u0012\b\u0012\u0004\u0012\u00020 0B\u0012\u0006\u0012\u0004\u0018\u00010.\u0018\u00010O¢\u0006\u0002\bCø\u0001��¢\u0006\u0002\u0010PJ:\u0010E\u001a\u00020F2\b\u0010G\u001a\u0004\u0018\u00010H2\u0006\u0010L\u001a\u00020\b2\u0006\u0010Q\u001a\u00020\b2\u0006\u0010R\u001a\u00020\b2\u0006\u0010S\u001a\u00020>2\b\b\u0002\u0010J\u001a\u00020KJ\b\u0010T\u001a\u000208H\u0002J<\u0010U\u001a\u00020F2\u0006\u0010V\u001a\u0002082\u0006\u0010<\u001a\u00020\b2\u0006\u0010L\u001a\u00020\b2\u0006\u0010W\u001a\u00020\b2\u0012\u0010,\u001a\u000e\u0012\u0004\u0012\u00020\b\u0012\u0004\u0012\u00020.01H\u0002JU\u0010X\u001a\u00020 2\u0012\b\u0002\u0010Y\u001a\f\u0012\b\b\u0001\u0012\u0004\u0018\u00010\b0Z21\u0010[\u001a-\b\u0001\u0012\u0013\u0012\u00110\u0003¢\u0006\f\b\\\u0012\b\b3\u0012\u0004\b\b(\u0002\u0012\n\u0012\b\u0012\u0004\u0012\u00020 0B\u0012\u0006\u0012\u0004\u0018\u00010.0@ø\u0001��¢\u0006\u0002\u0010]J\u000e\u0010\u0013\u001a\b\u0012\u0004\u0012\u00020\u001409H\u0002J\u000e\u0010\u0015\u001a\b\u0012\u0004\u0012\u00020\u001609H\u0002J\u001c\u0010^\u001a\u00020_2\u0012\u0010`\u001a\u000e\u0012\u0004\u0012\u00020\b\u0012\u0004\u0012\u00020\b0-H\u0002J\u000e\u0010a\u001a\u00020\b2\u0006\u00103\u001a\u00020\bR\u0013\u0010\u0007\u001a\u0004\u0018\u00010\b¢\u0006\b\n��\u001a\u0004\b\t\u0010\nR\u0011\u0010\u000b\u001a\u00020\b¢\u0006\b\n��\u001a\u0004\b\f\u0010\nR\u0014\u0010\r\u001a\b\u0012\u0004\u0012\u00020\u000f0\u000eX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0010\u001a\u00020��8VX\u0096\u0004¢\u0006\u0006\u001a\u0004\b\u0011\u0010\u0012R\u0014\u0010\u0013\u001a\b\u0012\u0004\u0012\u00020\u00140\u000eX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0015\u001a\b\u0012\u0004\u0012\u00020\u00160\u000eX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0017\u001a\b\u0012\u0004\u0012\u00020\u00180\u000eX\u0082\u0004¢\u0006\u0002\n��R\u0013\u0010\u0019\u001a\u0004\u0018\u00010\b¢\u0006\b\n��\u001a\u0004\b\u001a\u0010\nR\u0011\u0010\u0004\u001a\u00020\u0005¢\u0006\b\n��\u001a\u0004\b\u001b\u0010\u001cR\u0014\u0010\u0002\u001a\u00020\u0003X\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u001d\u0010\u001e\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006b"}, d2 = {"Lio/floodplain/kotlindsl/Stream;", "Lio/floodplain/kotlindsl/FloodplainSourceContainer;", "topologyContext", "Lio/floodplain/streams/api/TopologyContext;", "topologyConstructor", "Lio/floodplain/streams/remotejoin/TopologyConstructor;", "(Lio/floodplain/streams/api/TopologyContext;Lio/floodplain/streams/remotejoin/TopologyConstructor;)V", "deployment", "", "getDeployment", "()Ljava/lang/String;", "generation", "getGeneration", "localSinkConfigurations", "", "Lio/floodplain/kotlindsl/AbstractSinkConfig;", "rootTopology", "getRootTopology", "()Lio/floodplain/kotlindsl/Stream;", "sinkConfigurations", "Lio/floodplain/kotlindsl/SinkConfig;", "sourceConfigurations", "Lio/floodplain/kotlindsl/SourceConfig;", "sources", "Lio/floodplain/kotlindsl/Source;", "tenant", "getTenant", "getTopologyConstructor", "()Lio/floodplain/streams/remotejoin/TopologyConstructor;", "getTopologyContext", "()Lio/floodplain/streams/api/TopologyContext;", "addLocalSinkConfiguration", "", "c", "addSinkConfiguration", "addSource", "source", "addSourceConfiguration", "createKafkaOffsetBackingStore", "Lorg/apache/kafka/connect/storage/KafkaOffsetBackingStore;", "workerConfig", "Lorg/apache/kafka/connect/runtime/WorkerConfig;", "createProperties", "Ljava/util/Properties;", "extra", "", "", "extendWorkerProperties", "workerProps", "", "generationalTopic", "name", "instantiateLocalSinks", "settings", "render", "Lkotlin/Triple;", "Lorg/apache/kafka/streams/Topology;", "", "Lkotlin/Pair;", "renderAndExecute", "applicationId", "bufferTime", "", "localCmds", "Lkotlin/Function2;", "Lio/floodplain/kotlindsl/LocalContext;", "Lkotlin/coroutines/Continuation;", "Lkotlin/ExtensionFunctionType;", "(Ljava/lang/String;Ljava/lang/Integer;Lkotlin/jvm/functions/Function2;)V", "renderAndSchedule", "Lorg/apache/kafka/streams/KafkaStreams;", "connectorURL", "Ljava/net/URL;", "Ljava/io/InputStream;", "force", "", "kafkaHosts", "initialSettings", "monitor", "Lkotlin/Function3;", "(Ljava/net/URL;Ljava/lang/String;ZLjava/util/Map;Lkotlin/jvm/functions/Function3;)Lorg/apache/kafka/streams/KafkaStreams;", "kafkaUsername", "kafkaPassword", "replicationFactor", "renderTopology", "runTopology", "topology", "storagePath", "runWithArguments", "args", "", "after", "Lkotlin/ParameterName;", "([Ljava/lang/String;Lkotlin/jvm/functions/Function2;)V", "startLocalConnect", "Lorg/apache/kafka/connect/runtime/Herder;", "initialWorkerProps", "topic", "floodplain-dsl"})
/* loaded from: input_file:io/floodplain/kotlindsl/Stream.class */
public final class Stream implements FloodplainSourceContainer {
    private final List<Source> sources;
    private final List<SinkConfig> sinkConfigurations;
    private final List<SourceConfig> sourceConfigurations;
    private final List<AbstractSinkConfig> localSinkConfigurations;

    @Nullable
    private final String tenant;

    @Nullable
    private final String deployment;

    @NotNull
    private final String generation;

    @NotNull
    private final TopologyContext topologyContext;

    @NotNull
    private final TopologyConstructor topologyConstructor;

    @Nullable
    public final String getTenant() {
        return this.tenant;
    }

    @Nullable
    public final String getDeployment() {
        return this.deployment;
    }

    @NotNull
    public final String getGeneration() {
        return this.generation;
    }

    @NotNull
    public final String topic(@NotNull String str) {
        Intrinsics.checkNotNullParameter(str, "name");
        StringBuilder sb = new StringBuilder();
        if (this.tenant != null) {
            sb.append(this.tenant + '-');
        }
        if (this.deployment != null) {
            sb.append(this.deployment + '-');
        }
        sb.append(str);
        String sb2 = sb.toString();
        Intrinsics.checkNotNullExpressionValue(sb2, "buffer.toString()");
        return sb2;
    }

    @NotNull
    public final String generationalTopic(@NotNull String str) {
        Intrinsics.checkNotNullParameter(str, "name");
        StringBuilder sb = new StringBuilder();
        if (this.tenant != null) {
            sb.append(this.tenant + '-');
        }
        if (this.deployment != null) {
            sb.append(this.deployment + '-');
        }
        sb.append(this.generation + '-');
        sb.append(str);
        String sb2 = sb.toString();
        Intrinsics.checkNotNullExpressionValue(sb2, "buffer.toString()");
        return sb2;
    }

    @Override // io.floodplain.kotlindsl.FloodplainSourceContainer
    public void addSource(@NotNull Source source) {
        Intrinsics.checkNotNullParameter(source, "source");
        this.sources.add(source);
    }

    @NotNull
    public final SinkConfig addSinkConfiguration(@NotNull SinkConfig sinkConfig) {
        Intrinsics.checkNotNullParameter(sinkConfig, "c");
        this.sinkConfigurations.add(sinkConfig);
        return sinkConfig;
    }

    public final void addLocalSinkConfiguration(@NotNull AbstractSinkConfig abstractSinkConfig) {
        Intrinsics.checkNotNullParameter(abstractSinkConfig, "c");
        this.localSinkConfigurations.add(abstractSinkConfig);
    }

    public final void addSourceConfiguration(@NotNull SourceConfig sourceConfig) {
        Intrinsics.checkNotNullParameter(sourceConfig, "c");
        this.sourceConfigurations.add(sourceConfig);
    }

    private final List<SinkConfig> sinkConfigurations() {
        return CollectionsKt.toList(this.sinkConfigurations);
    }

    private final List<SourceConfig> sourceConfigurations() {
        return CollectionsKt.toList(this.sourceConfigurations);
    }

    private final Topology renderTopology() {
        Topology topology = new Topology();
        List<Source> list = this.sources;
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(list, 10));
        Iterator<T> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(((Source) it.next()).toReactivePipe());
        }
        ArrayList arrayList2 = arrayList;
        Stack stack = new Stack();
        Iterator it2 = arrayList2.iterator();
        while (it2.hasNext()) {
            ReactivePipeParser.processPipe(getTopologyContext(), this.topologyConstructor, topology, this.topologyConstructor.generateNewStreamId(), stack, (ReactivePipe) it2.next(), false);
        }
        ReplicationTopologyParser.materializeStateStores(this.topologyConstructor, topology);
        return topology;
    }

    public final void renderAndExecute(@Nullable String str, @Nullable Integer num, @NotNull Function2<? super LocalContext, ? super Continuation<? super Unit>, ? extends Object> function2) {
        KLogger kLogger;
        KLogger kLogger2;
        KLogger kLogger3;
        KLogger kLogger4;
        Intrinsics.checkNotNullParameter(function2, "localCmds");
        Triple<Topology, List<Pair<String, String>>, List<Pair<String, String>>> render = render();
        Topology topology = (Topology) render.component1();
        List list = (List) render.component2();
        List list2 = (List) render.component3();
        List<SourceConfig> list3 = this.sourceConfigurations;
        List<SinkConfig> list4 = this.sinkConfigurations;
        kLogger = StreamKt.logger;
        kLogger.info("Testing topology:\n" + topology.describe());
        kLogger2 = StreamKt.logger;
        kLogger2.info("Testing sources:\n" + list);
        kLogger3 = StreamKt.logger;
        kLogger3.info("Testing sinks:\n" + list2);
        kLogger4 = StreamKt.logger;
        StringBuilder append = new StringBuilder().append("Sourcetopics: \n");
        Set desiredTopicNames = this.topologyConstructor.desiredTopicNames();
        Intrinsics.checkNotNullExpressionValue(desiredTopicNames, "topologyConstructor.desiredTopicNames()");
        Set set = desiredTopicNames;
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(set, 10));
        Iterator it = set.iterator();
        while (it.hasNext()) {
            arrayList.add(((Topic) it.next()).qualifiedString());
        }
        kLogger4.info(append.append(arrayList).toString());
        String str2 = str;
        if (str2 == null) {
            str2 = UUID.randomUUID().toString();
            Intrinsics.checkNotNullExpressionValue(str2, "UUID.randomUUID().toString()");
        }
        LocalRuntimeKt.runLocalTopology(str2, num, topology, function2, this, this.topologyConstructor, getTopologyContext(), list3, list4, list2);
    }

    public static /* synthetic */ void renderAndExecute$default(Stream stream, String str, Integer num, Function2 function2, int i, Object obj) {
        if ((i & 1) != 0) {
            str = (String) null;
        }
        if ((i & 2) != 0) {
            num = (Integer) null;
        }
        stream.renderAndExecute(str, num, function2);
    }

    @NotNull
    public final KafkaStreams renderAndSchedule(@Nullable URL url, @NotNull InputStream inputStream, boolean z) {
        Intrinsics.checkNotNullParameter(inputStream, "settings");
        Properties properties = new Properties();
        properties.load(inputStream);
        Object obj = properties.get("bootstrap.servers");
        if (obj == null) {
            throw new NullPointerException("null cannot be cast to non-null type kotlin.String");
        }
        return renderAndSchedule$default(this, url, (String) obj, z, Utils.propsToStringMap(properties), null, 16, null);
    }

    public static /* synthetic */ KafkaStreams renderAndSchedule$default(Stream stream, URL url, InputStream inputStream, boolean z, int i, Object obj) {
        if ((i & 4) != 0) {
            z = false;
        }
        return stream.renderAndSchedule(url, inputStream, z);
    }

    @NotNull
    public final KafkaStreams renderAndSchedule(@Nullable URL url, @NotNull String str, @NotNull String str2, @NotNull String str3, int i, boolean z) {
        Intrinsics.checkNotNullParameter(str, "kafkaHosts");
        Intrinsics.checkNotNullParameter(str2, "kafkaUsername");
        Intrinsics.checkNotNullParameter(str3, "kafkaPassword");
        return renderAndSchedule$default(this, url, str, z, MapsKt.mapOf(new Pair[]{TuplesKt.to("bootstrap.servers", str), TuplesKt.to("security.protocol", "SASL_SSL"), TuplesKt.to("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule   required username='" + str2 + "'   password='" + str3 + "';"), TuplesKt.to("sasl.mechanism", "PLAIN"), TuplesKt.to("acks", "all"), TuplesKt.to("replication.factor", String.valueOf(i))}), null, 16, null);
    }

    public static /* synthetic */ KafkaStreams renderAndSchedule$default(Stream stream, URL url, String str, String str2, String str3, int i, boolean z, int i2, Object obj) {
        if ((i2 & 32) != 0) {
            z = false;
        }
        return stream.renderAndSchedule(url, str, str2, str3, i, z);
    }

    /* JADX WARN: Code restructure failed: missing block: B:4:0x0035, code lost:
    
        if (r0 != null) goto L8;
     */
    /* JADX WARN: Code restructure failed: missing block: B:8:0x005d, code lost:
    
        if (r0 != null) goto L14;
     */
    @org.jetbrains.annotations.NotNull
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public final org.apache.kafka.streams.KafkaStreams renderAndSchedule(@org.jetbrains.annotations.Nullable java.net.URL r9, @org.jetbrains.annotations.NotNull java.lang.String r10, boolean r11, @org.jetbrains.annotations.Nullable java.util.Map<java.lang.String, java.lang.String> r12, @org.jetbrains.annotations.Nullable kotlin.jvm.functions.Function3<? super io.floodplain.kotlindsl.Stream, ? super org.apache.kafka.streams.KafkaStreams, ? super kotlin.coroutines.Continuation<? super kotlin.Unit>, ? extends java.lang.Object> r13) {
        /*
            Method dump skipped, instructions count: 622
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.floodplain.kotlindsl.Stream.renderAndSchedule(java.net.URL, java.lang.String, boolean, java.util.Map, kotlin.jvm.functions.Function3):org.apache.kafka.streams.KafkaStreams");
    }

    public static /* synthetic */ KafkaStreams renderAndSchedule$default(Stream stream, URL url, String str, boolean z, Map map, Function3 function3, int i, Object obj) {
        if ((i & 4) != 0) {
            z = false;
        }
        if ((i & 8) != 0) {
            map = (Map) null;
        }
        if ((i & 16) != 0) {
            function3 = (Function3) null;
        }
        return stream.renderAndSchedule(url, str, z, map, function3);
    }

    private final void instantiateLocalSinks(Map<String, String> map) {
        Herder herder = (Herder) null;
        if (!this.localSinkConfigurations.isEmpty()) {
            herder = startLocalConnect(map);
        }
        int i = 0;
        List<AbstractSinkConfig> list = this.localSinkConfigurations;
        ArrayList<Map> arrayList = new ArrayList();
        Iterator<T> it = list.iterator();
        while (it.hasNext()) {
            CollectionsKt.addAll(arrayList, ((AbstractSinkConfig) it.next()).instantiateSinkElements());
        }
        for (Map map2 : arrayList) {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.putAll(map2);
            int i2 = i;
            i = i2 + 1;
            String str = "conn-" + i2;
            linkedHashMap.put("name", str);
            Herder herder2 = herder;
            if (herder2 != null) {
                herder2.putConnectorConfig(str, linkedHashMap, true, new Callback<Herder.Created<ConnectorInfo>>() { // from class: io.floodplain.kotlindsl.Stream$instantiateLocalSinks$2$1
                    public final void onCompletion(Throwable th, Herder.Created<ConnectorInfo> created) {
                        KLogger kLogger;
                        KLogger kLogger2;
                        if (th != null) {
                            kLogger2 = StreamKt.logger;
                            kLogger2.error("Error creating connector:", th);
                        }
                        kLogger = StreamKt.logger;
                        kLogger.info("Instantiated: " + (created != null ? Boolean.valueOf(created.created()) : null) + " result: " + (created != null ? (ConnectorInfo) created.result() : null));
                    }
                });
            }
        }
    }

    private final void extendWorkerProperties(Map<String, String> map) {
        map.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        map.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        map.put("listeners", "http://127.0.0.1:8084");
        Set set = CollectionsKt.toSet(map.keySet());
        ArrayList<String> arrayList = new ArrayList();
        for (Object obj : set) {
            String str = (String) obj;
            if (StringsKt.startsWith$default(str, "security", false, 2, (Object) null) || StringsKt.startsWith$default(str, "sasl", false, 2, (Object) null) || StringsKt.startsWith$default(str, "ssl", false, 2, (Object) null) || StringsKt.startsWith$default(str, "bootstrap", false, 2, (Object) null)) {
                arrayList.add(obj);
            }
        }
        for (String str2 : arrayList) {
            String str3 = map.get(str2);
            Intrinsics.checkNotNull(str3);
            map.put("consumer." + str2, str3);
            String str4 = map.get(str2);
            Intrinsics.checkNotNull(str4);
            map.put("producer." + str2, str4);
        }
    }

    private final KafkaOffsetBackingStore createKafkaOffsetBackingStore(WorkerConfig workerConfig) {
        KafkaOffsetBackingStore kafkaOffsetBackingStore = new KafkaOffsetBackingStore();
        kafkaOffsetBackingStore.configure(workerConfig);
        kafkaOffsetBackingStore.start();
        return kafkaOffsetBackingStore;
    }

    private final Herder startLocalConnect(Map<String, String> map) {
        KLogger kLogger;
        KLogger kLogger2;
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.putAll(map);
        extendWorkerProperties(linkedHashMap);
        Plugins plugins = new Plugins(linkedHashMap);
        WorkerConfig distributedConfig = new DistributedConfig(linkedHashMap);
        Time time = Time.SYSTEM;
        Intrinsics.checkNotNullExpressionValue(time, "Time.SYSTEM");
        Object newPlugin = plugins.newPlugin(distributedConfig.getString("connector.client.config.override.policy"), (AbstractConfig) distributedConfig, ConnectorClientConfigOverridePolicy.class);
        Intrinsics.checkNotNullExpressionValue(newPlugin, "plugins.newPlugin(\n     …icy::class.java\n        )");
        ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = (ConnectorClientConfigOverridePolicy) newPlugin;
        String lookupKafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
        Intrinsics.checkNotNullExpressionValue(lookupKafkaClusterId, "ConnectUtils.lookupKafkaClusterId(config)");
        kLogger = StreamKt.logger;
        kLogger.debug("Kafka cluster ID: " + lookupKafkaClusterId);
        OffsetBackingStore createKafkaOffsetBackingStore = createKafkaOffsetBackingStore(distributedConfig);
        RestServer restServer = new RestServer(distributedConfig);
        restServer.initializeServer();
        URI advertisedUrl = restServer.advertisedUrl();
        Intrinsics.checkNotNullExpressionValue(advertisedUrl, "rest.advertisedUrl()");
        Worker worker = new Worker(advertisedUrl.getHost().toString() + ":" + advertisedUrl.getPort(), time, plugins, distributedConfig, createKafkaOffsetBackingStore, connectorClientConfigOverridePolicy);
        StatusBackingStore kafkaStatusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
        kafkaStatusBackingStore.configure(distributedConfig);
        Herder distributedHerder = new DistributedHerder(distributedConfig, time, worker, lookupKafkaClusterId, kafkaStatusBackingStore, new KafkaConfigBackingStore(worker.getInternalValueConverter(), distributedConfig, worker.configTransformer()), advertisedUrl.toString(), connectorClientConfigOverridePolicy);
        new Connect(distributedHerder, restServer).start();
        kLogger2 = StreamKt.logger;
        kLogger2.info("Connect started!!");
        return distributedHerder;
    }

    private final Triple<Topology, List<Pair<String, String>>, List<Pair<String, String>>> render() {
        Topology renderTopology = renderTopology();
        List<SourceConfig> sourceConfigurations = sourceConfigurations();
        ArrayList arrayList = new ArrayList();
        Iterator<T> it = sourceConfigurations.iterator();
        while (it.hasNext()) {
            CollectionsKt.addAll(arrayList, ((SourceConfig) it.next()).materializeConnectorConfig());
        }
        ArrayList<MaterializedConfig> arrayList2 = arrayList;
        ArrayList arrayList3 = new ArrayList(CollectionsKt.collectionSizeOrDefault(arrayList2, 10));
        for (MaterializedConfig materializedConfig : arrayList2) {
            arrayList3.add(TuplesKt.to(materializedConfig.getName(), FloodplainConnectorKt.constructConnectorJson(getTopologyContext(), materializedConfig.getName(), materializedConfig.getSettings())));
        }
        ArrayList arrayList4 = arrayList3;
        List<SinkConfig> sinkConfigurations = sinkConfigurations();
        ArrayList arrayList5 = new ArrayList();
        Iterator<T> it2 = sinkConfigurations.iterator();
        while (it2.hasNext()) {
            CollectionsKt.addAll(arrayList5, ((SinkConfig) it2.next()).materializeConnectorConfig());
        }
        ArrayList<MaterializedConfig> arrayList6 = arrayList5;
        ArrayList arrayList7 = new ArrayList(CollectionsKt.collectionSizeOrDefault(arrayList6, 10));
        for (MaterializedConfig materializedConfig2 : arrayList6) {
            arrayList7.add(TuplesKt.to(materializedConfig2.getName(), FloodplainConnectorKt.constructConnectorJson(getTopologyContext(), materializedConfig2.getName(), materializedConfig2.getSettings())));
        }
        return new Triple<>(renderTopology, arrayList4, arrayList7);
    }

    private final KafkaStreams runTopology(Topology topology, String str, String str2, String str3, Map<String, Object> map) {
        KLogger kLogger;
        map.put("bootstrap.servers", str2);
        map.put("application.id", str);
        map.put("state.dir", str3);
        final KafkaStreams kafkaStreams = new KafkaStreams(topology, createProperties(map));
        TopologyDescription describe = topology.describe();
        kLogger = StreamKt.logger;
        kLogger.info("CurrentTopology:\n " + describe);
        FilesKt.writeText$default(new File("topology.txt"), describe.toString(), (Charset) null, 2, (Object) null);
        kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: io.floodplain.kotlindsl.Stream$runTopology$1
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public final void uncaughtException(@NotNull Thread thread, @Nullable Throwable th) {
                KLogger kLogger2;
                Intrinsics.checkNotNullParameter(thread, "thread");
                kLogger2 = StreamKt.logger;
                kLogger2.error("Error in streams. thread: " + thread.getName() + " exception: ", th);
                kafkaStreams.close();
            }
        });
        kafkaStreams.setStateListener(new KafkaStreams.StateListener() { // from class: io.floodplain.kotlindsl.Stream$runTopology$2
            public final void onChange(@Nullable KafkaStreams.State state, @Nullable KafkaStreams.State state2) {
                KLogger kLogger2;
                kLogger2 = StreamKt.logger;
                kLogger2.info("State moving from {} to {}", new Object[]{state2, state, kafkaStreams.state()});
            }
        });
        kafkaStreams.start();
        return kafkaStreams;
    }

    private final Properties createProperties(Map<String, ? extends Object> map) {
        Properties properties = new Properties();
        properties.putAll(map);
        properties.putIfAbsent("default.key.serde", Serdes.String().getClass());
        properties.putIfAbsent("default.key.serde", Serdes.String().getClass());
        properties.putIfAbsent("default.value.serde", StreamOperators.replicationSerde.getClass());
        properties.putIfAbsent("auto.offset.reset", "earliest");
        properties.putIfAbsent("session.timeout.ms", 30000);
        properties.putIfAbsent("request.timeout.ms", 40000);
        properties.putIfAbsent("heartbeat.interval.ms", 5000);
        properties.putIfAbsent("max.poll.interval.ms", 7200000);
        properties.putIfAbsent("max.poll.records", 100);
        properties.putIfAbsent("compression.type", "lz4");
        properties.putIfAbsent("num.stream.threads", 1);
        properties.putIfAbsent("num.standby.replicas", 0);
        properties.putIfAbsent("replication.factor", Integer.valueOf(CoreOperators.topicReplicationCount()));
        properties.putIfAbsent("default.timestamp.extractor", WallclockTimestampExtractor.class);
        properties.put("retention.ms", 86400000);
        properties.put("message.timestamp.difference.max.ms", 6048000000L);
        properties.put("log.message.timestamp.difference.max.ms", 6652800000L);
        properties.put("cache.max.bytes.buffering", 10485760L);
        properties.put("commit.interval.ms", 1000);
        properties.put("max.request.size", 7900000);
        properties.put("max.partition.fetch.bytes", 7900000);
        properties.put("rocksdb.config.setter", RocksDBConfigurationSetter.class);
        return properties;
    }

    public final void runWithArguments(@NotNull String[] strArr, @NotNull Function2<? super TopologyContext, ? super Continuation<? super Unit>, ? extends Object> function2) {
        Intrinsics.checkNotNullParameter(strArr, "args");
        Intrinsics.checkNotNullParameter(function2, "after");
        BuildersKt.runBlocking$default((CoroutineContext) null, new Stream$runWithArguments$1(this, strArr, function2, null), 1, (Object) null);
    }

    public static /* synthetic */ void runWithArguments$default(Stream stream, String[] strArr, Function2 function2, int i, Object obj) {
        if ((i & 1) != 0) {
            strArr = new String[0];
        }
        stream.runWithArguments(strArr, function2);
    }

    @Override // io.floodplain.kotlindsl.FloodplainOperator
    @NotNull
    public Stream getRootTopology() {
        return this;
    }

    @Override // io.floodplain.kotlindsl.FloodplainOperator
    @NotNull
    public TopologyContext getTopologyContext() {
        return this.topologyContext;
    }

    @NotNull
    public final TopologyConstructor getTopologyConstructor() {
        return this.topologyConstructor;
    }

    public Stream(@NotNull TopologyContext topologyContext, @NotNull TopologyConstructor topologyConstructor) {
        Intrinsics.checkNotNullParameter(topologyContext, "topologyContext");
        Intrinsics.checkNotNullParameter(topologyConstructor, "topologyConstructor");
        this.topologyContext = topologyContext;
        this.topologyConstructor = topologyConstructor;
        this.sources = new ArrayList();
        this.sinkConfigurations = new ArrayList();
        this.sourceConfigurations = new ArrayList();
        this.localSinkConfigurations = new ArrayList();
        this.tenant = (String) getTopologyContext().getTenant().orElse(null);
        this.deployment = (String) getTopologyContext().getDeployment().orElse(null);
        String generation = getTopologyContext().getGeneration();
        Intrinsics.checkNotNullExpressionValue(generation, "topologyContext.generation");
        this.generation = generation;
    }
}
