package io.floodplain.streams.remotejoin.ranged;

import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.streams.api.CoreOperators;
import io.floodplain.streams.remotejoin.PreJoinProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/streams/remotejoin/ranged/ManyToManyGroupedProcessor.class */
public class ManyToManyGroupedProcessor extends AbstractProcessor<String, ReplicationMessage> {
    private static final Logger logger = LoggerFactory.getLogger(ManyToManyGroupedProcessor.class);
    private String fromProcessorName;
    private String withProcessorName;
    private boolean optional;
    private BiFunction<ReplicationMessage, List<ReplicationMessage>, ReplicationMessage> manyToManyJoinFunction = CoreOperators.getListJoinFunctionToParam(false);
    private KeyValueStore<String, ReplicationMessage> forwardLookupStore;
    private KeyValueStore<String, ReplicationMessage> reverseLookupStore;
    private final Predicate<String, ReplicationMessage> associationBypass;

    public ManyToManyGroupedProcessor(String str, String str2, Optional<Predicate<String, ReplicationMessage>> optional, boolean z) {
        this.fromProcessorName = str;
        this.withProcessorName = str2;
        this.optional = z;
        this.associationBypass = optional.orElse((str3, replicationMessage) -> {
            return true;
        });
    }

    public void init(ProcessorContext processorContext) {
        super.init(processorContext);
        this.forwardLookupStore = processorContext.getStateStore("STORE_" + this.fromProcessorName);
        this.reverseLookupStore = processorContext.getStateStore("STORE_" + this.withProcessorName);
    }

    public void process(String str, ReplicationMessage replicationMessage) {
        boolean z = false;
        if (str.endsWith(PreJoinProcessor.REVERSE_IDENTIFIER)) {
            z = true;
            str = str.substring(0, str.length() - PreJoinProcessor.REVERSE_IDENTIFIER.length());
        }
        if (z) {
            reverseJoin(str, replicationMessage);
        } else {
            forwardJoin(str, replicationMessage);
        }
    }

    private void reverseJoin(String str, ReplicationMessage replicationMessage) {
        String ungroupKeyReverse = CoreOperators.ungroupKeyReverse(str);
        if (replicationMessage == null) {
            logger.info("reverseJoin joinGrouped emitting null message with key: {} ", ungroupKeyReverse);
            context().forward(ungroupKeyReverse, (Object) null);
            return;
        }
        KeyValueIterator range = this.forwardLookupStore.range(ungroupKeyReverse + "|", ungroupKeyReverse + "}");
        while (range.hasNext()) {
            try {
                KeyValue keyValue = (KeyValue) range.next();
                forwardJoin((String) keyValue.key, (ReplicationMessage) keyValue.value);
            } catch (Throwable th) {
                if (range != null) {
                    try {
                        range.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (range != null) {
            range.close();
        }
    }

    private void forwardJoin(String str, ReplicationMessage replicationMessage) {
        String ungroupKey = CoreOperators.ungroupKey(str);
        String ungroupKeyReverse = CoreOperators.ungroupKeyReverse(str);
        if (replicationMessage == null) {
            logger.info("forwardJoin joinGrouped emitting null message with key: {} ", ungroupKey);
            context().forward(ungroupKey, (Object) null);
            return;
        }
        try {
            if (!this.associationBypass.test(ungroupKey, replicationMessage)) {
                forwardMessage(ungroupKey, replicationMessage);
                return;
            }
        } catch (Throwable th) {
            logger.error("Error on checking filter predicate: {}", th);
        }
        if (replicationMessage.operation() == ReplicationMessage.Operation.DELETE) {
        }
        ReplicationMessage withOperation = replicationMessage.withOperation(replicationMessage.operation());
        ArrayList arrayList = new ArrayList();
        KeyValueIterator range = this.reverseLookupStore.range(ungroupKeyReverse + "|", ungroupKeyReverse + "}");
        while (range.hasNext()) {
            try {
                arrayList.add((ReplicationMessage) ((KeyValue) range.next()).value);
            } catch (Throwable th2) {
                if (range != null) {
                    try {
                        range.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                }
                throw th2;
            }
        }
        if (range != null) {
            range.close();
        }
        ReplicationMessage apply = this.manyToManyJoinFunction.apply(withOperation, arrayList);
        if (this.optional || !arrayList.isEmpty()) {
            forwardMessage(ungroupKey, apply);
        } else {
            forwardMessage(ungroupKey, apply.withOperation(ReplicationMessage.Operation.DELETE));
        }
    }

    private void forwardMessage(String str, ReplicationMessage replicationMessage) {
        context().forward(str, replicationMessage);
        if (replicationMessage.operation() == ReplicationMessage.Operation.DELETE) {
            logger.debug("Delete forwarded, appending null forward with key: {}", str);
            context().forward(str, (Object) null);
        }
    }
}
