package io.floodplain.streams.remotejoin;

import io.floodplain.immutable.api.ImmutableMessage;
import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.replication.factory.ReplicationFactory;
import java.util.Optional;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;

/* loaded from: input_file:io/floodplain/streams/remotejoin/StoreStateProcessor.class */
public class StoreStateProcessor extends AbstractProcessor<String, ReplicationMessage> {
    private final String lookupStoreName;
    private KeyValueStore<String, ImmutableMessage> lookupStore;
    public static final String COMMONKEY = "singlerestore";

    public StoreStateProcessor(String str) {
        this.lookupStoreName = str;
    }

    @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
    public void init(ProcessorContext processorContext) {
        this.lookupStore = (KeyValueStore) processorContext.getStateStore(this.lookupStoreName);
        super.init(processorContext);
    }

    @Override // org.apache.kafka.streams.processor.Processor
    public void process(String str, ReplicationMessage replicationMessage) {
        Optional<ImmutableMessage> paramMessage = replicationMessage.paramMessage();
        if (paramMessage.isEmpty()) {
            throw new RuntimeException("In store state there should definitely be a secondary message");
        }
        ImmutableMessage immutableMessage = paramMessage.get();
        this.lookupStore.put(str, immutableMessage);
        super.context().forward(str, ReplicationFactory.standardMessage(immutableMessage));
    }
}
