package io.floodplain.streams.remotejoin;

import io.floodplain.replication.api.ReplicationMessage;
import java.util.function.BiFunction;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/streams/remotejoin/OneToOneProcessor.class */
public class OneToOneProcessor implements Processor<String, ReplicationMessage, String, ReplicationMessage> {
    private final String forwardLookupStoreName;
    private final String reverseLookupStoreName;
    private KeyValueStore<String, ReplicationMessage> forwardLookupStore;
    private KeyValueStore<String, ReplicationMessage> reverseLookupStore;
    private final BiFunction<ReplicationMessage, ReplicationMessage, ReplicationMessage> joinFunction;
    private final boolean optional;
    private static final Logger logger = LoggerFactory.getLogger(OneToOneProcessor.class);
    private ProcessorContext<String, ReplicationMessage> context;

    public OneToOneProcessor(String str, String str2, boolean z, BiFunction<ReplicationMessage, ReplicationMessage, ReplicationMessage> biFunction) {
        this.forwardLookupStoreName = str;
        this.reverseLookupStoreName = str2;
        this.optional = z;
        this.joinFunction = biFunction;
    }

    public void init(ProcessorContext<String, ReplicationMessage> processorContext) {
        this.context = processorContext;
        logger.info("inner lookup Looking up: " + this.forwardLookupStoreName);
        this.forwardLookupStore = processorContext.getStateStore(this.forwardLookupStoreName);
        logger.info("inner lookup Looking up: " + this.reverseLookupStoreName);
        this.reverseLookupStore = processorContext.getStateStore(this.reverseLookupStoreName);
        logger.info("One-to-one successfully started");
    }

    public void process(Record<String, ReplicationMessage> record) {
        boolean z = false;
        String str = (String) record.key();
        ReplicationMessage replicationMessage = (ReplicationMessage) record.value();
        if (replicationMessage == null) {
            this.context.forward(record.withValue((Object) null));
            return;
        }
        KeyValueStore<String, ReplicationMessage> keyValueStore = this.reverseLookupStore;
        if (str.endsWith(PreJoinProcessor.REVERSE_IDENTIFIER)) {
            z = true;
            str = str.substring(0, str.length() - PreJoinProcessor.REVERSE_IDENTIFIER.length());
            keyValueStore = this.forwardLookupStore;
        }
        ReplicationMessage replicationMessage2 = (ReplicationMessage) keyValueStore.get(str);
        if (replicationMessage2 == null) {
            if (!z && this.optional) {
                this.context.forward(new Record(str, replicationMessage, record.timestamp()));
                return;
            }
            return;
        }
        if (!z) {
            this.context.forward(new Record(str, this.joinFunction.apply(replicationMessage, replicationMessage2), record.timestamp()));
        } else if (replicationMessage.operation() == ReplicationMessage.Operation.DELETE && !this.optional) {
            this.context.forward(new Record(str, replicationMessage2.withOperation(ReplicationMessage.Operation.DELETE), record.timestamp()));
            this.context.forward(new Record(str, (Object) null, record.timestamp()));
        } else if (replicationMessage.operation() == ReplicationMessage.Operation.DELETE) {
            this.context.forward(new Record(str, replicationMessage2, record.timestamp()));
        } else {
            this.context.forward(new Record(str, this.joinFunction.apply(replicationMessage2, replicationMessage), record.timestamp()));
        }
    }
}
