package io.floodplain.streams.remotejoin.ranged;

import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.streams.api.CoreOperators;
import io.floodplain.streams.remotejoin.PreJoinProcessor;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.function.BiFunction;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/streams/remotejoin/ranged/ManyToOneGroupedProcessor.class */
public class ManyToOneGroupedProcessor implements Processor<String, ReplicationMessage, String, ReplicationMessage> {
    private static final Logger logger = LoggerFactory.getLogger(ManyToOneGroupedProcessor.class);
    private final String fromProcessorName;
    private final String withProcessorName;
    private final boolean optional;
    private final BiFunction<ReplicationMessage, ReplicationMessage, ReplicationMessage> joinFunction = (replicationMessage, replicationMessage2) -> {
        return replicationMessage.withParamMessage(replicationMessage2.message());
    };
    private KeyValueStore<String, ReplicationMessage> forwardLookupStore;
    private KeyValueStore<String, ReplicationMessage> reverseLookupStore;
    private ProcessorContext<String, ReplicationMessage> context;

    public ManyToOneGroupedProcessor(String str, String str2, boolean z) {
        this.fromProcessorName = str;
        this.withProcessorName = str2;
        this.optional = z;
    }

    public void init(ProcessorContext processorContext) {
        this.context = processorContext;
        this.forwardLookupStore = processorContext.getStateStore("STORE_" + this.fromProcessorName);
        this.reverseLookupStore = processorContext.getStateStore("STORE_" + this.withProcessorName);
    }

    public void process(Record<String, ReplicationMessage> record) {
        String str = (String) record.key();
        ReplicationMessage replicationMessage = (ReplicationMessage) record.value();
        if (str.contains("{")) {
            throw new RuntimeException("Failed. bad key: " + str);
        }
        boolean z = false;
        if (str.endsWith(PreJoinProcessor.REVERSE_IDENTIFIER)) {
            z = true;
            str = str.substring(0, str.length() - PreJoinProcessor.REVERSE_IDENTIFIER.length());
        }
        if (z) {
            reverseJoin(str, replicationMessage, record.timestamp());
        } else {
            forwardJoin(str, replicationMessage, record.timestamp());
        }
    }

    private void reverseJoin(String str, ReplicationMessage replicationMessage, long j) {
        if (replicationMessage == null) {
            logger.debug("reverseJoin joinGrouped emitting null message with key: {} ", str);
            this.context.forward(new Record(str, (Object) null, j));
            return;
        }
        if (replicationMessage.operation() == ReplicationMessage.Operation.DELETE) {
            reverseJoinDelete(str, replicationMessage, j);
            return;
        }
        ReplicationMessage withOperation = replicationMessage.withOperation(replicationMessage.operation());
        KeyValueIterator range = this.forwardLookupStore.range(str + "|", str + "}");
        while (range.hasNext()) {
            try {
                KeyValue keyValue = (KeyValue) range.next();
                forwardMessage(CoreOperators.ungroupKey((String) keyValue.key), this.joinFunction.apply((ReplicationMessage) keyValue.value, withOperation), j);
            } catch (Throwable th) {
                if (range != null) {
                    try {
                        range.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (range != null) {
            range.close();
        }
    }

    private void reverseJoinDelete(String str, ReplicationMessage replicationMessage, long j) {
        logger.debug("Delete detected for key: {}", str);
        ArrayList arrayList = new ArrayList();
        KeyValueIterator range = this.forwardLookupStore.range(str + "|", str + "}");
        while (range.hasNext()) {
            try {
                KeyValue keyValue = (KeyValue) range.next();
                if (this.optional) {
                    forwardMessage(CoreOperators.ungroupKey((String) keyValue.key), (ReplicationMessage) keyValue.value, j);
                } else {
                    forwardMessage(CoreOperators.ungroupKey((String) keyValue.key), this.joinFunction.apply(replicationMessage, (ReplicationMessage) keyValue.value).withOperation(ReplicationMessage.Operation.DELETE), j);
                    arrayList.add((String) keyValue.key);
                }
            } catch (Throwable th) {
                if (range != null) {
                    try {
                        range.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (range != null) {
            range.close();
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.forwardLookupStore.delete((String) it.next());
        }
    }

    private void forwardJoin(String str, ReplicationMessage replicationMessage, long j) {
        String ungroupKey = CoreOperators.ungroupKey(str);
        String ungroupKeyReverse = CoreOperators.ungroupKeyReverse(str);
        if (replicationMessage == null) {
            this.context.forward(new Record(ungroupKey, (Object) null, j));
            return;
        }
        ReplicationMessage withOperation = replicationMessage.withOperation(replicationMessage.operation());
        ReplicationMessage replicationMessage2 = (ReplicationMessage) this.reverseLookupStore.get(ungroupKeyReverse);
        if (replicationMessage2 != null) {
            forwardMessage(ungroupKey, this.joinFunction.apply(withOperation, replicationMessage2), j);
        } else if (this.optional) {
            forwardMessage(ungroupKey, replicationMessage, j);
        } else {
            forwardMessage(ungroupKey, replicationMessage.withOperation(ReplicationMessage.Operation.DELETE), j);
        }
    }

    private void forwardMessage(String str, ReplicationMessage replicationMessage, long j) {
        this.context.forward(new Record(str, replicationMessage, j));
        if (replicationMessage.operation() == ReplicationMessage.Operation.DELETE) {
            logger.debug("Delete forwarded, appending null forward with key: {}", str);
            this.context.forward(new Record(str, (Object) null, j));
        }
    }
}
