package io.floodplain.streams.remotejoin.ranged;

import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.streams.api.CoreOperators;
import io.floodplain.streams.remotejoin.PreJoinProcessor;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Optional;
import java.util.function.BiFunction;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/streams/remotejoin/ranged/ManyToOneGroupedProcessor.class */
public class ManyToOneGroupedProcessor extends AbstractProcessor<String, ReplicationMessage> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ManyToOneGroupedProcessor.class);
    private final String fromProcessorName;
    private final String withProcessorName;
    private final boolean optional;
    private final BiFunction<ReplicationMessage, ReplicationMessage, ReplicationMessage> joinFunction = (replicationMessage, replicationMessage2) -> {
        return replicationMessage.withParamMessage(replicationMessage2.message());
    };
    private KeyValueStore<String, ReplicationMessage> forwardLookupStore;
    private KeyValueStore<String, ReplicationMessage> reverseLookupStore;
    private final Predicate<String, ReplicationMessage> associationBypass;

    public ManyToOneGroupedProcessor(String str, String str2, Optional<Predicate<String, ReplicationMessage>> optional, boolean z) {
        this.fromProcessorName = str;
        this.withProcessorName = str2;
        this.optional = z;
        this.associationBypass = optional.orElse((str3, replicationMessage) -> {
            return true;
        });
    }

    @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
    public void init(ProcessorContext processorContext) {
        super.init(processorContext);
        this.forwardLookupStore = (KeyValueStore) processorContext.getStateStore("STORE_" + this.fromProcessorName);
        this.reverseLookupStore = (KeyValueStore) processorContext.getStateStore("STORE_" + this.withProcessorName);
    }

    @Override // org.apache.kafka.streams.processor.Processor
    public void process(String str, ReplicationMessage replicationMessage) {
        if (str.contains("{")) {
            throw new RuntimeException("Failed. bad key: " + str);
        }
        boolean z = false;
        if (str.endsWith(PreJoinProcessor.REVERSE_IDENTIFIER)) {
            z = true;
            str = str.substring(0, str.length() - PreJoinProcessor.REVERSE_IDENTIFIER.length());
        }
        if (z) {
            reverseJoin(str, replicationMessage);
        } else {
            forwardJoin(str, replicationMessage);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void reverseJoin(String str, ReplicationMessage replicationMessage) {
        if (replicationMessage == null) {
            logger.debug("reverseJoin joinGrouped emitting null message with key: {} ", str);
            context().forward(str, null);
            return;
        }
        if (replicationMessage.operation() == ReplicationMessage.Operation.DELETE) {
            reverseJoinDelete(str, replicationMessage);
            return;
        }
        ReplicationMessage withOperation = replicationMessage.withOperation(replicationMessage.operation());
        KeyValueIterator<String, ReplicationMessage> range = this.forwardLookupStore.range(str + "|", str + "}");
        while (range.hasNext()) {
            KeyValue next = range.next();
            String ungroupKey = CoreOperators.ungroupKey((String) next.key);
            if (this.associationBypass.test(ungroupKey, (ReplicationMessage) next.value)) {
                forwardMessage(ungroupKey, this.joinFunction.apply((ReplicationMessage) next.value, withOperation));
            } else {
                forwardMessage(ungroupKey, (ReplicationMessage) next.value);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void reverseJoinDelete(String str, ReplicationMessage replicationMessage) {
        logger.debug("Delete detected for key: {}", str);
        ArrayList arrayList = new ArrayList();
        KeyValueIterator<String, ReplicationMessage> range = this.forwardLookupStore.range(str + "|", str + "}");
        while (range.hasNext()) {
            KeyValue next = range.next();
            if (this.optional) {
                forwardMessage(CoreOperators.ungroupKey((String) next.key), (ReplicationMessage) next.value);
            } else {
                forwardMessage(CoreOperators.ungroupKey((String) next.key), this.joinFunction.apply(replicationMessage, (ReplicationMessage) next.value).withOperation(ReplicationMessage.Operation.DELETE));
                arrayList.add((String) next.key);
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.forwardLookupStore.delete((String) it.next());
        }
    }

    private void forwardJoin(String str, ReplicationMessage replicationMessage) {
        String ungroupKey = CoreOperators.ungroupKey(str);
        String ungroupKeyReverse = CoreOperators.ungroupKeyReverse(str);
        if (replicationMessage == null) {
            context().forward(ungroupKey, null);
            return;
        }
        try {
            if (!this.associationBypass.test(ungroupKey, replicationMessage)) {
                forwardMessage(ungroupKey, replicationMessage);
                return;
            }
        } catch (Throwable th) {
            logger.error("Error on checking filter predicate", th);
        }
        if (replicationMessage.operation() == ReplicationMessage.Operation.DELETE) {
        }
        ReplicationMessage withOperation = replicationMessage.withOperation(replicationMessage.operation());
        ReplicationMessage replicationMessage2 = this.reverseLookupStore.get(ungroupKeyReverse);
        if (replicationMessage2 != null) {
            forwardMessage(ungroupKey, this.joinFunction.apply(withOperation, replicationMessage2));
        } else if (this.optional) {
            forwardMessage(ungroupKey, replicationMessage);
        } else {
            forwardMessage(ungroupKey, replicationMessage.withOperation(ReplicationMessage.Operation.DELETE));
        }
    }

    private void forwardMessage(String str, ReplicationMessage replicationMessage) {
        context().forward(str, replicationMessage);
        if (replicationMessage.operation() == ReplicationMessage.Operation.DELETE) {
            logger.debug("Delete forwarded, appending null forward with key: {}", str);
            context().forward(str, null);
        }
    }
}
