package io.floodplain.streams.remotejoin;

import io.floodplain.replication.api.ReplicationMessage;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;

/* loaded from: input_file:io/floodplain/streams/remotejoin/StoreProcessor.class */
public class StoreProcessor extends AbstractProcessor<String, ReplicationMessage> {
    private final String lookupStoreName;
    private KeyValueStore<String, ReplicationMessage> lookupStore;

    public StoreProcessor(String str) {
        this.lookupStoreName = str;
    }

    @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
    public void init(ProcessorContext processorContext) {
        this.lookupStore = (KeyValueStore) processorContext.getStateStore(this.lookupStoreName);
        super.init(processorContext);
    }

    @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
    public void close() {
    }

    @Override // org.apache.kafka.streams.processor.Processor
    public void process(String str, ReplicationMessage replicationMessage) {
        if (replicationMessage != null && replicationMessage.operation() != ReplicationMessage.Operation.DELETE) {
            this.lookupStore.put(str, replicationMessage);
            context().forward(str, replicationMessage);
            return;
        }
        ReplicationMessage replicationMessage2 = this.lookupStore.get(str);
        if (replicationMessage2 != null) {
            this.lookupStore.delete(str);
            context().forward(str, replicationMessage2.withOperation(ReplicationMessage.Operation.DELETE));
        }
        context().forward(str, null);
    }
}
