package io.floodplain.streams.remotejoin.ranged;

import io.floodplain.replication.api.ReplicationMessage;
import java.util.function.Function;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/streams/remotejoin/ranged/GroupedUpdateProcessor.class */
public class GroupedUpdateProcessor extends AbstractProcessor<String, ReplicationMessage> {
    private final String mappingStoreName;
    private final String lookupStoreName;
    private KeyValueStore<String, ReplicationMessage> lookupStore;
    private KeyValueStore<String, ReplicationMessage> mappingStore;
    private final Function<ReplicationMessage, String> keyExtract;
    private final boolean ignoreOriginalKey;
    private final boolean log = false;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) GroupedUpdateProcessor.class);

    public GroupedUpdateProcessor(String str, Function<ReplicationMessage, String> function, String str2, boolean z) {
        this.lookupStoreName = str;
        this.mappingStoreName = str2;
        this.keyExtract = function;
        this.ignoreOriginalKey = z;
    }

    @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
    public void init(ProcessorContext processorContext) {
        this.lookupStore = (KeyValueStore) processorContext.getStateStore(this.lookupStoreName);
        this.mappingStore = (KeyValueStore) processorContext.getStateStore(this.mappingStoreName);
        super.init(processorContext);
    }

    @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
    public void close() {
    }

    @Override // org.apache.kafka.streams.processor.Processor
    public void process(String str, ReplicationMessage replicationMessage) {
        if (replicationMessage != null) {
            String assembleGroupedKey = assembleGroupedKey(str, replicationMessage);
            if (this.log) {
                logger.info("Processor: {}, Assembling key. Ignoring original: {}, original: {} assembled: {}", this.lookupStoreName, Boolean.valueOf(this.ignoreOriginalKey), str, assembleGroupedKey);
            }
            if (replicationMessage.operation() == ReplicationMessage.Operation.DELETE) {
                this.lookupStore.delete(assembleGroupedKey);
                this.mappingStore.delete(str);
            } else {
                ReplicationMessage replicationMessage2 = this.mappingStore.get(str);
                if (replicationMessage2 != null) {
                    String assembleGroupedKey2 = this.ignoreOriginalKey ? str : assembleGroupedKey(str, replicationMessage2);
                    if (!assembleGroupedKey.equals(assembleGroupedKey2)) {
                        this.lookupStore.delete(assembleGroupedKey2);
                    }
                }
                this.lookupStore.put(assembleGroupedKey, replicationMessage.now());
                this.mappingStore.put(str, replicationMessage);
            }
            context().forward(assembleGroupedKey, replicationMessage.now());
        }
    }

    private String assembleGroupedKey(String str, ReplicationMessage replicationMessage) {
        String apply = this.keyExtract.apply(replicationMessage);
        if (apply.indexOf(124) != -1) {
            throw new IllegalArgumentException("Can't prefix key. Already a grouped key: " + apply + " grouping with: " + str);
        }
        if (str.indexOf(124) != -1) {
            throw new IllegalArgumentException("Can't prefix with key. Already a grouped key: " + str + " prepending with: " + apply);
        }
        return this.ignoreOriginalKey ? apply : apply + "|" + str;
    }
}
