package io.floodplain.streams.remotejoin.ranged;

import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.streams.api.CoreOperators;
import io.floodplain.streams.remotejoin.PreJoinProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/streams/remotejoin/ranged/OneToManyGroupedProcessor.class */
public class OneToManyGroupedProcessor extends AbstractProcessor<String, ReplicationMessage> {
    private static final Logger logger = LoggerFactory.getLogger(OneToManyGroupedProcessor.class);
    private final boolean debug;
    private String storeName;
    private String groupedStoreName;
    private boolean optional;
    private KeyValueStore<String, ReplicationMessage> groupedLookupStore;
    private KeyValueStore<String, ReplicationMessage> lookupStore;
    private BiFunction<ReplicationMessage, List<ReplicationMessage>, ReplicationMessage> joinFunction;

    public OneToManyGroupedProcessor(String str, String str2, boolean z, BiFunction<ReplicationMessage, List<ReplicationMessage>, ReplicationMessage> biFunction, boolean z2) {
        this.storeName = str;
        this.groupedStoreName = str2;
        this.optional = z;
        this.joinFunction = biFunction;
        this.debug = z2;
    }

    public void init(ProcessorContext processorContext) {
        this.lookupStore = processorContext.getStateStore(this.storeName);
        this.groupedLookupStore = processorContext.getStateStore(this.groupedStoreName);
        super.init(processorContext);
    }

    public void process(String str, ReplicationMessage replicationMessage) {
        boolean z = false;
        if (str.endsWith(PreJoinProcessor.REVERSE_IDENTIFIER)) {
            z = true;
            str = str.substring(0, str.length() - PreJoinProcessor.REVERSE_IDENTIFIER.length());
        }
        if (z) {
            reverseJoin(str, replicationMessage);
        } else if (replicationMessage != null) {
            forwardJoin(str, replicationMessage);
        } else {
            logger.debug("O2M Emitting null message with key: {}", str);
            context().forward(str, (Object) null);
        }
    }

    private void forwardJoin(String str, ReplicationMessage replicationMessage) {
        ArrayList arrayList = new ArrayList();
        KeyValueIterator range = this.groupedLookupStore.range(str + "|", str + "}");
        while (range.hasNext()) {
            try {
                arrayList.add((ReplicationMessage) ((KeyValue) range.next()).value);
            } catch (Throwable th) {
                if (range != null) {
                    try {
                        range.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (range != null) {
            range.close();
        }
        ReplicationMessage replicationMessage2 = replicationMessage;
        if (arrayList.size() > 0 || this.optional) {
            replicationMessage2 = this.joinFunction.apply(replicationMessage, arrayList);
        }
        if (this.optional || arrayList.size() > 0) {
            forwardMessage(str, replicationMessage2);
        } else {
            forwardMessage(str, replicationMessage2.withOperation(ReplicationMessage.Operation.DELETE));
        }
    }

    private void reverseJoin(String str, ReplicationMessage replicationMessage) {
        String ungroupKeyReverse = CoreOperators.ungroupKeyReverse(str);
        ReplicationMessage replicationMessage2 = (ReplicationMessage) this.lookupStore.get(ungroupKeyReverse);
        if (this.debug) {
            logger.info("# of elements in reverse store: {}", Long.valueOf(this.lookupStore.approximateNumEntries()));
        }
        if (replicationMessage2 == null) {
            return;
        }
        forwardJoin(ungroupKeyReverse, replicationMessage2);
    }

    private void forwardMessage(String str, ReplicationMessage replicationMessage) {
        context().forward(str, replicationMessage);
        if (replicationMessage.operation() == ReplicationMessage.Operation.DELETE) {
            logger.debug("Delete forwarded, appending null forward with key: {}", str);
            context().forward(str, (Object) null);
        }
    }
}
