package org.apache.kafka.streams.kstream.internals;

import java.util.Objects;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableSource.class */
public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KTableSource.class);
    private final String storeName;
    private String queryableName;
    private boolean sendOldValues;

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableSource$KTableSourceProcessor.class */
    private class KTableSourceProcessor extends AbstractProcessor<K, V> {
        private MeteredTimestampedKeyValueStore<K, V> store;
        private TimestampedTupleForwarder<K, V> tupleForwarder;
        private StreamsMetricsImpl metrics;
        private Sensor droppedRecordsSensor;
        private Sensor skippedIdempotentUpdatesSensor;

        private KTableSourceProcessor() {
            this.skippedIdempotentUpdatesSensor = null;
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
            this.metrics = (StreamsMetricsImpl) processorContext.metrics();
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), this.metrics);
            if (KTableSource.this.queryableName != null) {
                StateStore stateStore = processorContext.getStateStore(KTableSource.this.queryableName);
                try {
                    this.store = (MeteredTimestampedKeyValueStore) ((WrappedStateStore) stateStore).wrapped();
                    this.tupleForwarder = new TimestampedTupleForwarder<>(this.store, processorContext, new TimestampedCacheFlushListener(processorContext), KTableSource.this.sendOldValues);
                    this.skippedIdempotentUpdatesSensor = ProcessorNodeMetrics.skippedIdempotentUpdatesSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), ((InternalProcessorContext) processorContext).currentNode().name(), this.metrics);
                } catch (ClassCastException e) {
                    throw new IllegalStateException("Unexpected store type: " + stateStore.getClass() + " for store: " + KTableSource.this.queryableName, e);
                }
            }
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public void process(K k, V v) {
            V v2;
            if (k == null) {
                KTableSource.LOG.warn("Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]", context().topic(), Integer.valueOf(context().partition()), Long.valueOf(context().offset()));
                this.droppedRecordsSensor.record();
                return;
            }
            if (KTableSource.this.queryableName == null) {
                context().forward(k, new Change(v, null));
                return;
            }
            MeteredTimestampedKeyValueStore.RawAndDeserializedValue<V> withBinary = this.store.getWithBinary(k);
            ValueAndTimestamp<V> valueAndTimestamp = withBinary.value;
            if (valueAndTimestamp != null) {
                v2 = valueAndTimestamp.value();
                if (context().timestamp() < valueAndTimestamp.timestamp()) {
                    KTableSource.LOG.warn("Detected out-of-order KTable update for {} at offset {}, partition {}.", this.store.name(), Long.valueOf(context().offset()), Integer.valueOf(context().partition()));
                }
            } else {
                v2 = null;
            }
            if (this.store.putIfDifferentValues(k, ValueAndTimestamp.make(v, context().timestamp()), withBinary.serializedValue)) {
                this.tupleForwarder.maybeForward(k, v, v2);
            } else {
                this.skippedIdempotentUpdatesSensor.record();
            }
        }
    }

    public KTableSource(String str, String str2) {
        Objects.requireNonNull(str, "storeName can't be null");
        this.storeName = str;
        this.queryableName = str2;
        this.sendOldValues = false;
    }

    public String queryableName() {
        return this.queryableName;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorSupplier
    public Processor<K, V> get() {
        return new KTableSourceProcessor();
    }

    public void enableSendingOldValues() {
        this.sendOldValues = true;
        this.queryableName = this.storeName;
    }

    public void materialize() {
        this.queryableName = this.storeName;
    }

    public boolean materialized() {
        return this.queryableName != null;
    }
}
