package io.floodplain.streams.remotejoin;

import io.debezium.connector.mysql.SourceInfo;
import io.floodplain.immutable.api.ImmutableMessage;
import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.replication.factory.ReplicationFactory;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/streams/remotejoin/RowNumberProcessor.class */
public class RowNumberProcessor extends AbstractProcessor<String, ReplicationMessage> {
    private final String lookupStoreName;
    private KeyValueStore<String, ReplicationMessage> lookupStore;
    private static final Logger logger = LoggerFactory.getLogger(RowNumberProcessor.class);

    public RowNumberProcessor(String str) {
        this.lookupStoreName = str;
    }

    @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
    public void init(ProcessorContext processorContext) {
        this.lookupStore = (KeyValueStore) processorContext.getStateStore(this.lookupStoreName);
        super.init(processorContext);
    }

    @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
    public void close() {
    }

    @Override // org.apache.kafka.streams.processor.Processor
    public void process(String str, ReplicationMessage replicationMessage) {
        long longValue;
        if (replicationMessage == null) {
            logger.warn("RowNumber processor does not support deletes yet");
            return;
        }
        ReplicationMessage replicationMessage2 = this.lookupStore.get(str);
        if (replicationMessage2 == null) {
            longValue = this.lookupStore.approximateNumEntries() + 1;
            this.lookupStore.put(str, ReplicationFactory.empty().with(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, Long.valueOf(longValue), ImmutableMessage.ValueType.LONG));
        } else {
            longValue = ((Long) replicationMessage2.value(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY).orElse(0)).longValue();
        }
        context().forward(str, replicationMessage.with("_row", Long.valueOf(longValue), ImmutableMessage.ValueType.LONG));
    }
}
