package io.floodplain.reactive.source.topology;

import io.floodplain.reactive.source.topology.api.TopologyPipeComponent;
import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.replication.factory.ReplicationFactory;
import io.floodplain.streams.api.ProcessorName;
import io.floodplain.streams.api.Topic;
import io.floodplain.streams.api.TopologyContext;
import io.floodplain.streams.remotejoin.ReplicationTopologyParser;
import io.floodplain.streams.remotejoin.TopologyConstructor;
import java.util.Optional;
import java.util.Stack;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/reactive/source/topology/SinkTransformer.class */
public class SinkTransformer implements TopologyPipeComponent {
    private final Optional<ProcessorName> name;
    private final Optional<Integer> partitions;
    private final Topic.FloodplainKeyFormat keyFormat;
    private final Topic.FloodplainBodyFormat valueFormat;
    private final Topic topic;
    private static final Logger logger = LoggerFactory.getLogger(SinkTransformer.class);
    private boolean materializeParent = false;
    private boolean debug = false;

    public SinkTransformer(Optional<ProcessorName> optional, Topic topic, Optional<Integer> optional2, Topic.FloodplainKeyFormat floodplainKeyFormat, Topic.FloodplainBodyFormat floodplainBodyFormat) {
        this.name = optional;
        this.topic = topic;
        this.partitions = optional2;
        this.keyFormat = floodplainKeyFormat;
        this.valueFormat = floodplainBodyFormat;
    }

    @Override // io.floodplain.reactive.source.topology.api.TopologyPipeComponent
    public void addToTopology(Stack<String> stack, int i, Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor) {
        String qualifiedString = this.topic.qualifiedString();
        topologyConstructor.ensureTopicExists(this.topic, this.partitions);
        String str = (String) this.name.map(processorName -> {
            return processorName.definition() + "_" + topologyContext.topicName(processorName + "_" + this.topic.qualifiedString());
        }).orElse(qualifiedString);
        topologyConstructor.addSink(str);
        logger.info("Stack top for transformer: " + stack.peek());
        Serializer<String> keySerializer = ReplicationTopologyParser.keySerializer(this.keyFormat);
        Serializer<ReplicationMessage> bodySerializer = ReplicationTopologyParser.bodySerializer(this.valueFormat);
        if (this.debug) {
            debug("debug_" + str, topology, keySerializer, bodySerializer, stack);
        }
        topology.addSink(str, qualifiedString, keySerializer, bodySerializer, stack.peek());
    }

    private void debug(String str, Topology topology, Serializer<String> serializer, Serializer<ReplicationMessage> serializer2, Stack<String> stack) {
        EachProcessor eachProcessor = new EachProcessor((str2, immutableMessage, immutableMessage2) -> {
            logger.info("DEBIG: " + new String(serializer2.serialize(ClientInformation.UNKNOWN_NAME_OR_VERSION, ReplicationFactory.standardMessage(immutableMessage))));
        });
        topology.addProcessor(str, () -> {
            return eachProcessor;
        }, stack.peek());
    }

    @Override // io.floodplain.reactive.source.topology.api.TopologyPipeComponent
    public boolean materializeParent() {
        return this.materializeParent;
    }

    @Override // io.floodplain.reactive.source.topology.api.TopologyPipeComponent
    public void setMaterialize() {
        this.materializeParent = true;
    }
}
