package io.floodplain.streams.remotejoin;

import io.floodplain.immutable.api.ImmutableMessage;
import io.floodplain.reactive.source.topology.api.TopologyPipeComponent;
import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.streams.api.Topic;
import io.floodplain.streams.api.TopologyContext;
import io.floodplain.streams.remotejoin.ranged.GroupedUpdateProcessor;
import io.floodplain.streams.remotejoin.ranged.ManyToOneGroupedProcessor;
import io.floodplain.streams.remotejoin.ranged.OneToManyGroupedProcessor;
import io.floodplain.streams.serializer.ConnectReplicationMessageSerde;
import io.floodplain.streams.serializer.ImmutableMessageSerde;
import io.floodplain.streams.serializer.ReplicationMessageSerde;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Stack;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/streams/remotejoin/ReplicationTopologyParser.class */
public class ReplicationTopologyParser {
    public static final String STORE_PREFIX = "STORE_";
    private static final Serde<ReplicationMessage> messageSerde = new ReplicationMessageSerde();
    private static final Serde<ImmutableMessage> immutableMessageSerde = new ImmutableMessageSerde();
    private static final ReplicationMessageSerde replicationMessageSerder = new ReplicationMessageSerde();
    private static final ConnectReplicationMessageSerde connectReplicationMessageSerder = new ConnectReplicationMessageSerde();
    private static final Logger logger = LoggerFactory.getLogger(ReplicationTopologyParser.class);

    /* loaded from: input_file:io/floodplain/streams/remotejoin/ReplicationTopologyParser$Flatten.class */
    public enum Flatten {
        FIRST,
        LAST,
        NONE
    }

    private ReplicationTopologyParser() {
    }

    public static void addStateStoreMapping(Map<String, List<String>> map, String str, String str2) {
        logger.info("Adding processor: {} with statestore: {}", str, str2);
        map.computeIfAbsent(str2, str3 -> {
            return new ArrayList();
        }).add(str);
    }

    public static void materializeStateStores(TopologyConstructor topologyConstructor, Topology topology) {
        for (Map.Entry<String, List<String>> entry : topologyConstructor.processorStateStoreMapper.entrySet()) {
            String key = entry.getKey();
            StoreBuilder<KeyValueStore<String, ReplicationMessage>> storeBuilder = topologyConstructor.stateStoreSupplier.get(key);
            if (storeBuilder == null) {
                StoreBuilder<KeyValueStore<String, ImmutableMessage>> storeBuilder2 = topologyConstructor.immutableStoreSupplier.get(key);
                if (storeBuilder2 == null) {
                    logger.error("Missing supplier for: {}\nStore mappings: {} available suppliers: {}", new Object[]{entry.getKey(), topologyConstructor.processorStateStoreMapper, topologyConstructor.immutableStoreSupplier});
                    logger.error("Available state stores: {}\nimm: {}", topologyConstructor.stateStoreSupplier.keySet(), topologyConstructor.immutableStoreSupplier.keySet());
                    throw new RuntimeException("Missing supplier for: " + entry.getKey());
                }
                topology = topology.addStateStore(storeBuilder2, (String[]) entry.getValue().toArray(new String[0]));
                logger.info("Added processor: {} with sttstatestores: {} mappings: {}", new Object[]{entry.getKey(), entry.getValue(), topologyConstructor.processorStateStoreMapper.get(entry.getKey())});
            } else {
                topology = topology.addStateStore(storeBuilder, (String[]) entry.getValue().toArray(new String[0]));
                logger.info("Added processor: {} with sttstatestores: {} mappings: {}", new Object[]{entry.getKey(), entry.getValue(), topologyConstructor.processorStateStoreMapper.get(entry.getKey())});
            }
        }
    }

    public static void addDiffProcessor(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String str, String str2) {
        topology.addProcessor(str2, () -> {
            return new DiffProcessor(str2);
        }, str);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str2, str2);
        logger.info("Granting access for processor: {} to store: {}", str2, str2);
        topologyConstructor.stateStoreSupplier.put(str2, createMessageStoreSupplier(str2, true));
    }

    public static void addLazySourceStore(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, Topic topic, Deserializer<?> deserializer, Deserializer<?> deserializer2) {
        topologyConstructor.addDesiredTopic(topic, Optional.empty());
        if (topologyConstructor.sources.containsKey(topic)) {
            return;
        }
        topology.addSource(topic.qualifiedString(topologyContext), deserializer, deserializer2, topic.qualifiedString(topologyContext));
        topologyConstructor.sources.put(topic, topic.qualifiedString(topologyContext));
    }

    public static String addMaterializeStore(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String str, String str2) {
        topology.addProcessor(str, () -> {
            return new StoreProcessor("STORE_" + str);
        }, str2);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str, "STORE_" + str);
        topologyConstructor.stores.add("STORE_" + str);
        topologyConstructor.stateStoreSupplier.put("STORE_" + str, createMessageStoreSupplier("STORE_" + str, true));
        return str;
    }

    public static Deserializer<String> keyDeserializer(Topic.FloodplainKeyFormat floodplainKeyFormat) {
        switch (floodplainKeyFormat) {
            case CONNECT_KEY_JSON:
                return ConnectReplicationMessageSerde.keyDeserialize();
            case FLOODPLAIN_STRING:
                return Serdes.String().deserializer();
            default:
                throw new IllegalArgumentException("Weird key format: " + floodplainKeyFormat);
        }
    }

    public static Serializer<String> keySerializer(Topic.FloodplainKeyFormat floodplainKeyFormat) {
        switch (floodplainKeyFormat) {
            case CONNECT_KEY_JSON:
                return ConnectReplicationMessageSerde.keySerialize();
            case FLOODPLAIN_STRING:
                return Serdes.String().serializer();
            default:
                throw new IllegalArgumentException("Weird key format: " + floodplainKeyFormat);
        }
    }

    public static Deserializer<ReplicationMessage> bodyDeserializer(Topic.FloodplainBodyFormat floodplainBodyFormat) {
        switch (floodplainBodyFormat) {
            case CONNECT_JSON:
                return connectReplicationMessageSerder.deserializer();
            case FLOODPLAIN_JSON:
                return replicationMessageSerder.deserializer();
            default:
                throw new IllegalArgumentException("Weird body format: " + floodplainBodyFormat);
        }
    }

    public static Serializer<ReplicationMessage> bodySerializer(Topic.FloodplainBodyFormat floodplainBodyFormat) {
        switch (floodplainBodyFormat) {
            case CONNECT_JSON:
                return connectReplicationMessageSerder.serializer();
            case FLOODPLAIN_JSON:
                return replicationMessageSerder.serializer();
            default:
                throw new IllegalArgumentException("Weird body format: " + floodplainBodyFormat);
        }
    }

    public static String addSourceStore(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, Topic topic, Topic.FloodplainKeyFormat floodplainKeyFormat, Topic.FloodplainBodyFormat floodplainBodyFormat, boolean z) {
        String prefixedString = topic.prefixedString("SOURCE", topologyContext);
        if (!topologyConstructor.sources.containsKey(topic)) {
            String str = prefixedString + "_src";
            topology.addSource(str, keyDeserializer(floodplainKeyFormat), bodyDeserializer(floodplainBodyFormat), topic.qualifiedString(topologyContext));
            topologyConstructor.sources.put(topic, str);
            if (z) {
                topology.addProcessor(prefixedString, () -> {
                    return new StoreProcessor("STORE_" + prefixedString);
                }, str);
            } else {
                topology.addProcessor(prefixedString, IdentityProcessor::new, str);
            }
        }
        if (z) {
            addStateStoreMapping(topologyConstructor.processorStateStoreMapper, prefixedString, "STORE_" + prefixedString);
            topologyConstructor.stores.add("STORE_" + prefixedString);
            topologyConstructor.stateStoreSupplier.put("STORE_" + prefixedString, createMessageStoreSupplier("STORE_" + prefixedString, true));
        }
        logger.info("Granting access for processor: {} to store: {}", prefixedString, prefixedString);
        return prefixedString;
    }

    private static Flatten parseFlatten(String str) {
        return str == null ? Flatten.NONE : ("true".equals(str) || "first".equals(str)) ? Flatten.FIRST : "last".equals(str) ? Flatten.LAST : Flatten.NONE;
    }

    public static String addSingleJoinGrouped(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String str, String str2, Optional<Predicate<String, ReplicationMessage>> optional, String str3, boolean z, boolean z2) {
        String str4 = str2 + "-forwardpre";
        String str5 = str2 + "-reversepre";
        String str6 = str2 + "-joined";
        topology.addProcessor(str4, () -> {
            return new PreJoinProcessor(false);
        }, str).addProcessor(str5, () -> {
            return new PreJoinProcessor(true);
        }, str3).addProcessor(str6, () -> {
            return new ManyToOneGroupedProcessor(str, str3, optional, z);
        }, str4, str5);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str6, "STORE_" + str3);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str6, "STORE_" + str);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str2, "STORE_" + str2);
        topologyConstructor.stores.add("STORE_" + str3);
        topologyConstructor.stores.add("STORE_" + str);
        topologyConstructor.stores.add("STORE_" + str2);
        topologyConstructor.stateStoreSupplier.put("STORE_" + str2, createMessageStoreSupplier("STORE_" + str2, true));
        topology.addProcessor(str2, () -> {
            return new StoreProcessor("STORE_" + str2);
        }, str6);
        return str6;
    }

    public static String addGroupedProcessor(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String str, String str2, boolean z, Function<ReplicationMessage, String> function, Optional<ProcessorSupplier<String, ReplicationMessage>> optional) {
        if (!topologyConstructor.stores.contains("STORE_" + str2)) {
            logger.error("Adding grouped with from, no source processor present for: " + str2 + " created: " + topologyConstructor.stateStoreSupplier.keySet() + " and from: " + str2);
        }
        String str3 = str2 + "_mapping";
        String str4 = str + "_transform";
        topology.addProcessor(str4, optional.orElse(IdentityProcessor::new), str2);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str, "STORE_" + str);
        topologyConstructor.stores.add("STORE_" + str);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str, "STORE_" + str3);
        topologyConstructor.stores.add("STORE_" + str3);
        topologyConstructor.stateStoreSupplier.put("STORE_" + str, createMessageStoreSupplier("STORE_" + str, true));
        topologyConstructor.stateStoreSupplier.put("STORE_" + str3, createMessageStoreSupplier("STORE_" + str3, true));
        topology.addProcessor(str, () -> {
            return new GroupedUpdateProcessor("STORE_" + str, function, "STORE_" + str3, z);
        }, str4);
        return str;
    }

    public static void addPersistentCache(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String str, String str2, Duration duration, int i, boolean z) {
        topology.addProcessor(str, () -> {
            return new CacheProcessor(str, duration, i, z);
        }, str2);
        logger.info("Buffer using statestore: {}", "STORE_" + str);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str, "STORE_" + str);
        topologyConstructor.stateStoreSupplier.put("STORE_" + str, createMessageStoreSupplier("STORE_" + str, true));
    }

    public static String addReducer(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, Stack<String> stack, int i, List<TopologyPipeComponent> list, List<TopologyPipeComponent> list2, Function<ImmutableMessage, ImmutableMessage> function, boolean z, Optional<BiFunction<ImmutableMessage, ImmutableMessage, String>> optional) {
        String peek = stack.peek();
        String qualifiedName = topologyContext.qualifiedName("reduce", stack.size(), i);
        stack.push(qualifiedName);
        String qualifiedName2 = topologyContext.qualifiedName("ifelse", stack.size(), i);
        stack.push(qualifiedName2);
        int generateNewStreamId = topologyConstructor.generateNewStreamId();
        int generateNewStreamId2 = topologyConstructor.generateNewStreamId();
        String qualifiedName3 = topologyContext.qualifiedName("addbranch", stack.size(), i);
        String qualifiedName4 = topologyContext.qualifiedName("removeBranch", stack.size(), i);
        String qualifiedName5 = topologyContext.qualifiedName("reduce", stack.size(), i);
        String str = "STORE_accumulator_" + qualifiedName5;
        String str2 = "STORE_" + peek + "_reduce_inputstore";
        topology.addProcessor(qualifiedName, () -> {
            return new ReduceReadProcessor(str2, str, function, optional);
        }, peek);
        topology.addProcessor(qualifiedName2, () -> {
            return new IfElseProcessor(replicationMessage -> {
                return replicationMessage.operation() != ReplicationMessage.Operation.DELETE;
            }, qualifiedName3, Optional.of(qualifiedName4));
        }, qualifiedName);
        Stack<String> stack2 = new Stack<>();
        stack2.addAll(stack);
        topology.addProcessor(qualifiedName3, IdentityProcessor::new, stack2.peek());
        stack2.push(qualifiedName3);
        Stack<String> stack3 = new Stack<>();
        stack3.addAll(stack);
        topology.addProcessor(qualifiedName4, IdentityProcessor::new, stack3.peek());
        stack3.push(qualifiedName4);
        Iterator<TopologyPipeComponent> it = list.iterator();
        while (it.hasNext()) {
            it.next().addToTopology(stack2, generateNewStreamId, topology, topologyContext, topologyConstructor);
        }
        String qualifiedName6 = topologyContext.qualifiedName("primToSecondaryAdd", stack.size(), i);
        topology.addProcessor(qualifiedName6, PrimaryToSecondaryProcessor::new, stack2.peek());
        stack2.push(qualifiedName6);
        Iterator<TopologyPipeComponent> it2 = list2.iterator();
        while (it2.hasNext()) {
            it2.next().addToTopology(stack3, generateNewStreamId2, topology, topologyContext, topologyConstructor);
        }
        String qualifiedName7 = topologyContext.qualifiedName("primToSecondaryRemove", stack.size(), i);
        topology.addProcessor(qualifiedName7, PrimaryToSecondaryProcessor::new, stack3.peek());
        stack3.push(qualifiedName7);
        topology.addProcessor(z ? "_proc" + qualifiedName5 : qualifiedName5, () -> {
            return new StoreStateProcessor(str, optional);
        }, stack2.peek(), stack3.peek());
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, z ? "_proc" + qualifiedName5 : qualifiedName5, str);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, qualifiedName, str);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, qualifiedName, str2);
        if (!topologyConstructor.immutableStoreSupplier.containsKey(str)) {
            topologyConstructor.immutableStoreSupplier.put(str, createImmutableMessageSupplier(str, false));
        }
        if (!topologyConstructor.stateStoreSupplier.containsKey(str2)) {
            topologyConstructor.stateStoreSupplier.put(str2, createMessageStoreSupplier(str2, false));
        }
        if (z) {
            addMaterializeStore(topology, topologyContext, topologyConstructor, qualifiedName5, "_proc" + qualifiedName5);
        }
        return qualifiedName5;
    }

    public static Topology addJoin(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String str, String str2, String str3, boolean z, boolean z2, boolean z3, boolean z4) {
        String str4 = str3 + "-forwardpre";
        String str5 = str3 + "-reversepre";
        topology.addProcessor(str4, () -> {
            return new PreJoinProcessor(false);
        }, str).addProcessor(str5, () -> {
            return new PreJoinProcessor(true);
        }, str2);
        Processor oneToManyGroupedProcessor = z2 ? new OneToManyGroupedProcessor("STORE_" + str, "STORE_" + str2, z, z4) : new OneToOneProcessor("STORE_" + str, "STORE_" + str2, z, (replicationMessage, replicationMessage2) -> {
            return replicationMessage.withParamMessage(replicationMessage2.message());
        }, z4);
        String str6 = z3 ? "proc_" + str3 : str3;
        Processor processor = oneToManyGroupedProcessor;
        topology.addProcessor(str6, () -> {
            return processor;
        }, str4, str5);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str6, "STORE_" + str2);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str6, "STORE_" + str);
        if (z3) {
            topologyConstructor.stores.add("STORE_" + str3);
            topologyConstructor.stateStoreSupplier.put("STORE_" + str3, createMessageStoreSupplier("STORE_" + str3, true));
            addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str3, "STORE_" + str3);
            topology.addProcessor(str3, () -> {
                return new StoreProcessor("STORE_" + str3);
            }, str6);
        }
        return topology;
    }

    public static StoreBuilder<KeyValueStore<String, ReplicationMessage>> createMessageStoreSupplier(String str, boolean z) {
        logger.info("Creating messagestore supplier: {}", str);
        return Stores.keyValueStoreBuilder(z ? Stores.persistentKeyValueStore(str) : Stores.inMemoryKeyValueStore(str), Serdes.String(), messageSerde);
    }

    public static StoreBuilder<KeyValueStore<String, ImmutableMessage>> createImmutableMessageSupplier(String str, boolean z) {
        logger.info("Creating messagestore supplier: {}", str);
        return Stores.keyValueStoreBuilder(z ? Stores.persistentKeyValueStore(str) : Stores.inMemoryKeyValueStore(str), Serdes.String(), immutableMessageSerde);
    }

    public static StoreBuilder<KeyValueStore<String, Long>> createKeyRowStoreSupplier(String str) {
        logger.info("Creating key/long supplier: {}", str);
        return Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(str), Serdes.String(), Serdes.Long());
    }

    public static String addKeyRowProcessor(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String str, String str2, boolean z) {
        topology.addProcessor(str2, () -> {
            return new RowNumberProcessor("STORE_" + str2);
        }, str);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str2, "STORE_" + str2);
        logger.info("Granting access for processor: {} to store: {}", str2, "STORE_" + str2);
        topologyConstructor.stateStoreSupplier.put("STORE_" + str2, createMessageStoreSupplier("STORE_" + str2, false));
        if (z) {
            throw new UnsupportedOperationException("Sorry, didn't implement materialization yet");
        }
        return str2;
    }
}
