package io.floodplain.streams.remotejoin;

import io.floodplain.immutable.api.ImmutableMessage;
import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.replication.factory.ReplicationFactory;
import java.util.Collections;
import java.util.HashMap;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/streams/remotejoin/DiffProcessor.class */
public class DiffProcessor extends AbstractProcessor<String, ReplicationMessage> {
    private final String lookupStoreName;
    private KeyValueStore<String, ReplicationMessage> lookupStore;
    private static final Logger logger = LoggerFactory.getLogger(DiffProcessor.class);

    public DiffProcessor(String str) {
        this.lookupStoreName = str;
    }

    @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
    public void init(ProcessorContext processorContext) {
        this.lookupStore = (KeyValueStore) processorContext.getStateStore(this.lookupStoreName);
        super.init(processorContext);
    }

    @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
    public void close() {
    }

    private ReplicationMessage createMessage(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("key", str);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("key", ImmutableMessage.ValueType.STRING);
        return ReplicationFactory.fromMap(str, hashMap, hashMap2).withPrimaryKeys(Collections.singletonList("key"));
    }

    @Override // org.apache.kafka.streams.processor.Processor
    public void process(String str, ReplicationMessage replicationMessage) {
        if (replicationMessage == null || replicationMessage.operation() == ReplicationMessage.Operation.DELETE) {
            logger.debug("Delete detected in store: {} with key: {}", this.lookupStoreName, str);
            ReplicationMessage replicationMessage2 = this.lookupStore.get(str);
            if (replicationMessage2 != null) {
                this.lookupStore.delete(str);
                context().forward(str, createMessage(str).withSubMessage("old", replicationMessage2.message()).withOperation(ReplicationMessage.Operation.DELETE));
                return;
            }
            return;
        }
        ReplicationMessage replicationMessage3 = this.lookupStore.get(str);
        if (replicationMessage3 == null) {
            this.lookupStore.put(str, replicationMessage);
            context().forward(str, createMessage(str).withSubMessage("new", replicationMessage.message()).withOperation(ReplicationMessage.Operation.UPDATE));
        } else if (diff(replicationMessage3, replicationMessage)) {
            this.lookupStore.put(str, replicationMessage);
            context().forward(str, createMessage(str).withSubMessage("old", replicationMessage3.message()).withSubMessage("new", replicationMessage.message()).withOperation(ReplicationMessage.Operation.UPDATE));
        } else {
            logger.debug("Ignoring identical message for key: {} for store: {}", str, this.lookupStoreName);
        }
        this.lookupStore.put(str, replicationMessage);
    }

    private boolean diff(ReplicationMessage replicationMessage, ReplicationMessage replicationMessage2) {
        return !replicationMessage.equalsToMessage(replicationMessage2);
    }
}
