package io.floodplain.kotlindsl;

import io.debezium.data.Envelope;
import io.floodplain.reactive.topology.ReactivePipe;
import io.floodplain.reactive.topology.ReactivePipeParser;
import io.floodplain.streams.api.CoreOperators;
import io.floodplain.streams.api.Topic;
import io.floodplain.streams.api.TopologyContext;
import io.floodplain.streams.base.RocksDBConfigurationSetter;
import io.floodplain.streams.base.StreamOperators;
import io.floodplain.streams.remotejoin.ReplicationTopologyParser;
import io.floodplain.streams.remotejoin.TopologyConstructor;
import java.lang.Thread;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.Stack;
import java.util.UUID;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.Triple;
import kotlin.TuplesKt;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.coroutines.Continuation;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.BuildersKt;
import mu.KLogger;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.log4j.Priority;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.testcontainers.shaded.org.apache.commons.lang.time.DateUtils;

/* compiled from: Stream.kt */
@Metadata(mv = {1, 4, 0}, bv = {1, 0, 3}, k = 1, d1 = {"��\u009c\u0001\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0010!\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0010\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010 \n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u000b\n\u0002\b\u0005\n\u0002\u0010\u0011\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\u0018��2\u00020\u0001B\r\u0012\u0006\u0010\u0002\u001a\u00020\u0003¢\u0006\u0002\u0010\u0004J\u000e\u0010\u0011\u001a\u00020\n2\u0006\u0010\u0012\u001a\u00020\nJ\u0010\u0010\u0013\u001a\u00020\u00142\u0006\u0010\u0015\u001a\u00020\u000eH\u0016J\u000e\u0010\u0016\u001a\u00020\u00142\u0006\u0010\u0012\u001a\u00020\fJ \u0010\u0017\u001a\u00020\u00182\u0006\u0010\u0019\u001a\u00020\u001a2\u0006\u0010\u001b\u001a\u00020\u001a2\u0006\u0010\u001c\u001a\u00020\u001aH\u0002JF\u0010\u001d\u001a8\u0012\u0004\u0012\u00020\u001f\u0012\u0016\u0012\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u001a\u0012\u0004\u0012\u00020\u001a0!0 \u0012\u0016\u0012\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u001a\u0012\u0004\u0012\u00020\u001a0!0 0\u001e2\u0006\u0010\"\u001a\u00020#H\u0002JO\u0010$\u001a\u00020\u00142\n\b\u0002\u0010\u0019\u001a\u0004\u0018\u00010\u001a2\n\b\u0002\u0010%\u001a\u0004\u0018\u00010&2'\u0010'\u001a#\b\u0001\u0012\u0004\u0012\u00020)\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00140*\u0012\u0006\u0012\u0004\u0018\u00010+0(¢\u0006\u0002\b,ø\u0001��¢\u0006\u0002\u0010-J\"\u0010.\u001a\u00020/2\b\u00100\u001a\u0004\u0018\u0001012\u0006\u00102\u001a\u00020\u001a2\b\b\u0002\u00103\u001a\u000204J\u0010\u00105\u001a\u00020\u001f2\u0006\u0010\"\u001a\u00020#H\u0002J(\u00106\u001a\u00020/2\u0006\u00107\u001a\u00020\u001f2\u0006\u0010\u0019\u001a\u00020\u001a2\u0006\u00102\u001a\u00020\u001a2\u0006\u0010\u001c\u001a\u00020\u001aH\u0002JU\u00108\u001a\u00020\u00142\u0012\b\u0002\u00109\u001a\f\u0012\b\b\u0001\u0012\u0004\u0018\u00010\u001a0:21\u0010;\u001a-\b\u0001\u0012\u0013\u0012\u00110\u0003¢\u0006\f\b<\u0012\b\b=\u0012\u0004\b\b(\u0002\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00140*\u0012\u0006\u0012\u0004\u0018\u00010+0(ø\u0001��¢\u0006\u0002\u0010>J\u000e\u0010\b\u001a\b\u0012\u0004\u0012\u00020\n0 H\u0002J\u000e\u0010\u000b\u001a\b\u0012\u0004\u0012\u00020\f0 H\u0002R\u0014\u0010\u0005\u001a\u00020��8VX\u0096\u0004¢\u0006\u0006\u001a\u0004\b\u0006\u0010\u0007R\u0014\u0010\b\u001a\b\u0012\u0004\u0012\u00020\n0\tX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u000b\u001a\b\u0012\u0004\u0012\u00020\f0\tX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\r\u001a\b\u0012\u0004\u0012\u00020\u000e0\tX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0002\u001a\u00020\u0003X\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u000f\u0010\u0010\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006?"}, d2 = {"Lio/floodplain/kotlindsl/Stream;", "Lio/floodplain/kotlindsl/FloodplainSourceContainer;", "topologyContext", "Lio/floodplain/streams/api/TopologyContext;", "(Lio/floodplain/streams/api/TopologyContext;)V", "rootTopology", "getRootTopology", "()Lio/floodplain/kotlindsl/Stream;", "sinkConfigurations", "", "Lio/floodplain/kotlindsl/SinkConfig;", "sourceConfigurations", "Lio/floodplain/kotlindsl/SourceConfig;", "sources", "Lio/floodplain/kotlindsl/Source;", "getTopologyContext", "()Lio/floodplain/streams/api/TopologyContext;", "addSinkConfiguration", "c", "addSource", "", "source", "addSourceConfiguration", "createProperties", "Ljava/util/Properties;", "applicationId", "", "brokers", "storagePath", "render", "Lkotlin/Triple;", "Lorg/apache/kafka/streams/Topology;", "", "Lkotlin/Pair;", "topologyConstructor", "Lio/floodplain/streams/remotejoin/TopologyConstructor;", "renderAndExecute", "bufferTime", "", "localCmds", "Lkotlin/Function2;", "Lio/floodplain/kotlindsl/LocalContext;", "Lkotlin/coroutines/Continuation;", "", "Lkotlin/ExtensionFunctionType;", "(Ljava/lang/String;Ljava/lang/Integer;Lkotlin/jvm/functions/Function2;)V", "renderAndSchedule", "Lorg/apache/kafka/streams/KafkaStreams;", "connectorURL", "Ljava/net/URL;", "kafkaHosts", "force", "", "renderTopology", "runTopology", "topology", "runWithArguments", "args", "", Envelope.FieldName.AFTER, "Lkotlin/ParameterName;", "name", "([Ljava/lang/String;Lkotlin/jvm/functions/Function2;)V", "floodplain-dsl"})
/* loaded from: input_file:io/floodplain/kotlindsl/Stream.class */
public final class Stream implements FloodplainSourceContainer {
    private final List<Source> sources;
    private final List<SinkConfig> sinkConfigurations;
    private final List<SourceConfig> sourceConfigurations;

    @NotNull
    private final TopologyContext topologyContext;

    @Override // io.floodplain.kotlindsl.FloodplainSourceContainer
    public void addSource(@NotNull Source source) {
        Intrinsics.checkNotNullParameter(source, "source");
        this.sources.add(source);
    }

    @NotNull
    public final SinkConfig addSinkConfiguration(@NotNull SinkConfig c) {
        Intrinsics.checkNotNullParameter(c, "c");
        this.sinkConfigurations.add(c);
        return c;
    }

    public final void addSourceConfiguration(@NotNull SourceConfig c) {
        Intrinsics.checkNotNullParameter(c, "c");
        this.sourceConfigurations.add(c);
    }

    private final List<SinkConfig> sinkConfigurations() {
        return CollectionsKt.toList(this.sinkConfigurations);
    }

    private final List<SourceConfig> sourceConfigurations() {
        return CollectionsKt.toList(this.sourceConfigurations);
    }

    private final Topology renderTopology(TopologyConstructor topologyConstructor) {
        Topology topology = new Topology();
        List<Source> list = this.sources;
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(list, 10));
        Iterator<T> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(((Source) it.next()).toReactivePipe());
        }
        ArrayList arrayList2 = arrayList;
        Stack stack = new Stack();
        Iterator it2 = arrayList2.iterator();
        while (it2.hasNext()) {
            ReactivePipeParser.processPipe(getTopologyContext(), topologyConstructor, topology, topologyConstructor.generateNewStreamId(), stack, (ReactivePipe) it2.next(), false);
        }
        ReplicationTopologyParser.materializeStateStores(topologyConstructor, topology);
        return topology;
    }

    public final void renderAndExecute(@Nullable String str, @Nullable Integer num, @NotNull Function2<? super LocalContext, ? super Continuation<? super Unit>, ? extends Object> localCmds) {
        Intrinsics.checkNotNullParameter(localCmds, "localCmds");
        TopologyConstructor topologyConstructor = new TopologyConstructor();
        Triple<Topology, List<Pair<String, String>>, List<Pair<String, String>>> render = render(topologyConstructor);
        Topology component1 = render.component1();
        List<Pair<String, String>> component2 = render.component2();
        List<Pair<String, String>> component3 = render.component3();
        List<SourceConfig> list = this.sourceConfigurations;
        List<SinkConfig> list2 = this.sinkConfigurations;
        StreamKt.access$getLogger$p().info("Testing topology:\n" + component1.describe());
        StreamKt.access$getLogger$p().info("Testing sources:\n" + component2);
        StreamKt.access$getLogger$p().info("Testing sinks:\n" + component3);
        KLogger access$getLogger$p = StreamKt.access$getLogger$p();
        StringBuilder append = new StringBuilder().append("Sourcetopics: \n");
        Set<Topic> desiredTopicNames = topologyConstructor.desiredTopicNames();
        Intrinsics.checkNotNullExpressionValue(desiredTopicNames, "topologyConstructor.desiredTopicNames()");
        Set<Topic> set = desiredTopicNames;
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(set, 10));
        Iterator<T> it = set.iterator();
        while (it.hasNext()) {
            arrayList.add(((Topic) it.next()).qualifiedString());
        }
        access$getLogger$p.info(append.append(arrayList).toString());
        String str2 = str;
        if (str2 == null) {
            str2 = UUID.randomUUID().toString();
            Intrinsics.checkNotNullExpressionValue(str2, "UUID.randomUUID().toString()");
        }
        LocalRuntimeKt.runLocalTopology(str2, num, component1, localCmds, topologyConstructor, getTopologyContext(), list, list2);
    }

    public static /* synthetic */ void renderAndExecute$default(Stream stream, String str, Integer num, Function2 function2, int i, Object obj) {
        if ((i & 1) != 0) {
            str = (String) null;
        }
        if ((i & 2) != 0) {
            num = (Integer) null;
        }
        stream.renderAndExecute(str, num, function2);
    }

    @NotNull
    public final KafkaStreams renderAndSchedule(@Nullable URL url, @NotNull String kafkaHosts, boolean z) {
        Intrinsics.checkNotNullParameter(kafkaHosts, "kafkaHosts");
        TopologyConstructor topologyConstructor = new TopologyConstructor();
        Triple<Topology, List<Pair<String, String>>, List<Pair<String, String>>> render = render(topologyConstructor);
        Topology component1 = render.component1();
        List<Pair<String, String>> component2 = render.component2();
        List<Pair<String, String>> component3 = render.component3();
        topologyConstructor.createTopicsAsNeeded(getTopologyContext(), kafkaHosts);
        Iterator<T> it = component2.iterator();
        while (it.hasNext()) {
            Pair pair = (Pair) it.next();
            String str = (String) pair.component1();
            String str2 = (String) pair.component2();
            if (url != null) {
                FloodplainConnectorKt.startConstructor(str, getTopologyContext(), url, str2, z);
            }
        }
        Iterator<T> it2 = component3.iterator();
        while (it2.hasNext()) {
            Pair pair2 = (Pair) it2.next();
            String str3 = (String) pair2.component1();
            String str4 = (String) pair2.component2();
            if (url != null) {
                FloodplainConnectorKt.startConstructor(str3, getTopologyContext(), url, str4, z);
            }
        }
        String appId = getTopologyContext().topicName("@applicationId");
        Intrinsics.checkNotNullExpressionValue(appId, "appId");
        KafkaStreams runTopology = runTopology(component1, appId, kafkaHosts, "storagePath");
        StreamKt.access$getLogger$p().info(new Function0<Object>() { // from class: io.floodplain.kotlindsl.Stream$renderAndSchedule$3
            @Override // kotlin.jvm.functions.Function0
            @Nullable
            public final Object invoke() {
                return "Topology running!";
            }
        });
        return runTopology;
    }

    public static /* synthetic */ KafkaStreams renderAndSchedule$default(Stream stream, URL url, String str, boolean z, int i, Object obj) {
        if ((i & 4) != 0) {
            z = false;
        }
        return stream.renderAndSchedule(url, str, z);
    }

    private final Triple<Topology, List<Pair<String, String>>, List<Pair<String, String>>> render(TopologyConstructor topologyConstructor) {
        Topology renderTopology = renderTopology(topologyConstructor);
        List<SourceConfig> sourceConfigurations = sourceConfigurations();
        ArrayList arrayList = new ArrayList();
        Iterator<T> it = sourceConfigurations.iterator();
        while (it.hasNext()) {
            CollectionsKt.addAll(arrayList, ((SourceConfig) it.next()).materializeConnectorConfig(getTopologyContext()));
        }
        ArrayList<MaterializedConfig> arrayList2 = arrayList;
        ArrayList arrayList3 = new ArrayList(CollectionsKt.collectionSizeOrDefault(arrayList2, 10));
        for (MaterializedConfig materializedConfig : arrayList2) {
            arrayList3.add(TuplesKt.to(materializedConfig.getName(), FloodplainConnectorKt.constructConnectorJson(getTopologyContext(), materializedConfig.getName(), materializedConfig.getSettings())));
        }
        ArrayList arrayList4 = arrayList3;
        List<SinkConfig> sinkConfigurations = sinkConfigurations();
        ArrayList arrayList5 = new ArrayList();
        Iterator<T> it2 = sinkConfigurations.iterator();
        while (it2.hasNext()) {
            CollectionsKt.addAll(arrayList5, ((SinkConfig) it2.next()).materializeConnectorConfig(getTopologyContext()));
        }
        ArrayList<MaterializedConfig> arrayList6 = arrayList5;
        ArrayList arrayList7 = new ArrayList(CollectionsKt.collectionSizeOrDefault(arrayList6, 10));
        for (MaterializedConfig materializedConfig2 : arrayList6) {
            arrayList7.add(TuplesKt.to(materializedConfig2.getName(), FloodplainConnectorKt.constructConnectorJson(getTopologyContext(), materializedConfig2.getName(), materializedConfig2.getSettings())));
        }
        return new Triple<>(renderTopology, arrayList4, arrayList7);
    }

    private final KafkaStreams runTopology(Topology topology, String str, String str2, String str3) {
        final KafkaStreams kafkaStreams = new KafkaStreams(topology, createProperties(str, str2, str3));
        StreamKt.access$getLogger$p().info("CurrentTopology:\n " + topology.describe());
        kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: io.floodplain.kotlindsl.Stream$runTopology$1
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public final void uncaughtException(@NotNull Thread thread, @Nullable Throwable th) {
                KLogger kLogger;
                Intrinsics.checkNotNullParameter(thread, "thread");
                kLogger = StreamKt.logger;
                kLogger.error("Error in streams. thread: " + thread.getName() + " exception: ", th);
                KafkaStreams.this.close();
            }
        });
        kafkaStreams.setStateListener(new KafkaStreams.StateListener() { // from class: io.floodplain.kotlindsl.Stream$runTopology$2
            @Override // org.apache.kafka.streams.KafkaStreams.StateListener
            public final void onChange(@Nullable KafkaStreams.State state, @Nullable KafkaStreams.State state2) {
                KLogger kLogger;
                kLogger = StreamKt.logger;
                kLogger.info("State moving from {} to {}", state2, state, KafkaStreams.this.state());
            }
        });
        kafkaStreams.start();
        return kafkaStreams;
    }

    private final Properties createProperties(String str, String str2, String str3) {
        Properties properties = new Properties();
        StreamKt.access$getLogger$p().info("Creating application with name: {}", str);
        StreamKt.access$getLogger$p().info("Creating application id: {}", str);
        StreamKt.access$getLogger$p().info("Starting instance in storagePath: {}", str3);
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, str);
        properties.put("bootstrap.servers", str2);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StreamOperators.replicationSerde.getClass());
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put("session.timeout.ms", 30000);
        properties.put("request.timeout.ms", Integer.valueOf(Priority.ERROR_INT));
        properties.put("heartbeat.interval.ms", 5000);
        properties.put("max.poll.interval.ms", 7200000);
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        properties.put("compression.type", "lz4");
        properties.put(StreamsConfig.STATE_DIR_CONFIG, str3);
        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
        properties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 0);
        properties.put("retries", 50);
        properties.put("replication.factor", Integer.valueOf(CoreOperators.topicReplicationCount()));
        properties.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
        properties.put(TopicConfig.RETENTION_MS_CONFIG, Integer.valueOf(DateUtils.MILLIS_IN_DAY));
        properties.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, 6048000000L);
        properties.put("log.message.timestamp.difference.max.ms", 6652800000L);
        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760L);
        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 7900000);
        properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 7900000);
        properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBConfigurationSetter.class);
        return properties;
    }

    public final void runWithArguments(@NotNull String[] args, @NotNull Function2<? super TopologyContext, ? super Continuation<? super Unit>, ? extends Object> after) {
        Intrinsics.checkNotNullParameter(args, "args");
        Intrinsics.checkNotNullParameter(after, "after");
        BuildersKt.runBlocking$default(null, new Stream$runWithArguments$1(this, args, after, null), 1, null);
    }

    public static /* synthetic */ void runWithArguments$default(Stream stream, String[] strArr, Function2 function2, int i, Object obj) {
        if ((i & 1) != 0) {
            strArr = new String[0];
        }
        stream.runWithArguments(strArr, function2);
    }

    @Override // io.floodplain.kotlindsl.FloodplainOperator
    @NotNull
    public Stream getRootTopology() {
        return this;
    }

    @Override // io.floodplain.kotlindsl.FloodplainOperator
    @NotNull
    public TopologyContext getTopologyContext() {
        return this.topologyContext;
    }

    public Stream(@NotNull TopologyContext topologyContext) {
        Intrinsics.checkNotNullParameter(topologyContext, "topologyContext");
        this.topologyContext = topologyContext;
        this.sources = new ArrayList();
        this.sinkConfigurations = new ArrayList();
        this.sourceConfigurations = new ArrayList();
    }
}
