package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import kotlin.jvm.internal.LongCompanionObject;
import net.sourceforge.argparse4j.ArgumentParsers;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.FixedOrderMap;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorStateManager.class */
public class ProcessorStateManager implements StateManager {
    private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
    private final Logger log;
    private final TaskId taskId;
    private final String logPrefix;
    private final boolean isStandby;
    private final ChangelogReader changelogReader;
    private final Map<TopicPartition, Long> offsetLimits;
    private final Map<TopicPartition, Long> standbyRestoredOffsets;
    private final Map<String, StateRestoreCallback> restoreCallbacks;
    private final Map<String, RecordConverter> recordConverters;
    private final Map<String, String> storeToChangelogTopic;
    private final boolean eosEnabled;
    private final File baseDir;
    private OffsetCheckpoint checkpointFile;
    private final Map<TopicPartition, Long> initialLoadedCheckpoints;
    private final FixedOrderMap<String, Optional<StateStore>> registeredStores = new FixedOrderMap<>();
    private final FixedOrderMap<String, Optional<StateStore>> globalStores = new FixedOrderMap<>();
    private final List<TopicPartition> changelogPartitions = new ArrayList();
    private final Map<TopicPartition, Long> checkpointFileCache = new HashMap();
    private final Map<String, TopicPartition> partitionForTopic = new HashMap();

    public ProcessorStateManager(TaskId taskId, Collection<TopicPartition> collection, boolean z, StateDirectory stateDirectory, Map<String, String> map, ChangelogReader changelogReader, boolean z2, LogContext logContext) throws IOException {
        this.eosEnabled = z2;
        this.log = logContext.logger(ProcessorStateManager.class);
        this.taskId = taskId;
        this.changelogReader = changelogReader;
        this.logPrefix = String.format("task [%s] ", taskId);
        for (TopicPartition topicPartition : collection) {
            this.partitionForTopic.put(topicPartition.topic(), topicPartition);
        }
        this.offsetLimits = new HashMap();
        this.standbyRestoredOffsets = new ConcurrentHashMap();
        this.isStandby = z;
        this.restoreCallbacks = z ? new ConcurrentHashMap() : null;
        this.recordConverters = z ? new HashMap() : null;
        this.storeToChangelogTopic = new HashMap(map);
        this.baseDir = stateDirectory.directoryForTask(taskId);
        this.checkpointFile = new OffsetCheckpoint(new File(this.baseDir, ".checkpoint"));
        this.initialLoadedCheckpoints = this.checkpointFile.read();
        this.log.trace("Checkpointable offsets read from checkpoint: {}", this.initialLoadedCheckpoints);
        if (z2) {
            this.checkpointFile.delete();
            this.checkpointFile = null;
        }
        this.log.debug("Created state store manager for task {}", taskId);
    }

    public static String storeChangelogTopic(String str, String str2) {
        return str + ArgumentParsers.DEFAULT_PREFIX_CHARS + str2 + STATE_CHANGELOG_TOPIC_SUFFIX;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public File baseDir() {
        return this.baseDir;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void register(StateStore stateStore, StateRestoreCallback stateRestoreCallback) {
        String name = stateStore.name();
        this.log.debug("Registering state store {} to its state manager", name);
        if (".checkpoint".equals(name)) {
            throw new IllegalArgumentException(String.format("%sIllegal store name: %s", this.logPrefix, name));
        }
        if (this.registeredStores.containsKey(name) && this.registeredStores.get(name).isPresent()) {
            throw new IllegalArgumentException(String.format("%sStore %s has already been registered.", this.logPrefix, name));
        }
        String str = this.storeToChangelogTopic.get(name);
        if (str != null) {
            TopicPartition topicPartition = new TopicPartition(str, getPartition(str));
            RecordConverter converterForStore = StateManagerUtil.converterForStore(stateStore);
            if (this.isStandby) {
                this.log.trace("Preparing standby replica of persistent state store {} with changelog topic {}", name, str);
                this.restoreCallbacks.put(str, stateRestoreCallback);
                this.recordConverters.put(str, converterForStore);
            } else {
                Long l = stateStore.persistent() ? this.initialLoadedCheckpoints.get(topicPartition) : null;
                if (l != null) {
                    this.checkpointFileCache.put(topicPartition, l);
                }
                this.log.trace("Restoring state store {} from changelog topic {} at checkpoint {}", new Object[]{name, str, l});
                this.changelogReader.register(new StateRestorer(topicPartition, new CompositeRestoreListener(stateRestoreCallback), l, offsetLimit(topicPartition), stateStore.persistent(), name, converterForStore));
            }
            this.changelogPartitions.add(topicPartition);
        }
        this.registeredStores.put(name, Optional.of(stateStore));
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void reinitializeStateStoresForPartitions(Collection<TopicPartition> collection, InternalProcessorContext internalProcessorContext) {
        StateManagerUtil.reinitializeStateStoresForPartitions(this.log, this.eosEnabled, this.baseDir, this.registeredStores, this.storeToChangelogTopic, collection, internalProcessorContext, this.checkpointFile, this.checkpointFileCache);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearCheckpoints() throws IOException {
        if (this.checkpointFile != null) {
            this.checkpointFile.delete();
            this.checkpointFile = null;
            this.checkpointFileCache.clear();
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Checkpointable
    public Map<TopicPartition, Long> checkpointed() {
        updateCheckpointFileCache(Collections.emptyMap());
        HashMap hashMap = new HashMap();
        Iterator<Map.Entry<String, StateRestoreCallback>> it = this.restoreCallbacks.entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            TopicPartition topicPartition = new TopicPartition(key, getPartition(key));
            hashMap.put(topicPartition, this.checkpointFileCache.getOrDefault(topicPartition, -1L));
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updateStandbyStates(TopicPartition topicPartition, List<ConsumerRecord<byte[], byte[]>> list, long j) {
        RecordBatchingStateRestoreCallback adapt = StateRestoreCallbackAdapter.adapt(this.restoreCallbacks.get(topicPartition.topic()));
        if (!list.isEmpty()) {
            RecordConverter recordConverter = this.recordConverters.get(topicPartition.topic());
            ArrayList arrayList = new ArrayList(list.size());
            Iterator<ConsumerRecord<byte[], byte[]>> it = list.iterator();
            while (it.hasNext()) {
                arrayList.add(recordConverter.convert(it.next()));
            }
            try {
                adapt.restoreBatch(arrayList);
            } catch (RuntimeException e) {
                throw new ProcessorStateException(String.format("%sException caught while trying to restore state from %s", this.logPrefix, topicPartition), e);
            }
        }
        this.standbyRestoredOffsets.put(topicPartition, Long.valueOf(j + 1));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void putOffsetLimits(Map<TopicPartition, Long> map) {
        this.log.trace("Updating store offset limit with {}", map);
        this.offsetLimits.putAll(map);
    }

    private long offsetLimit(TopicPartition topicPartition) {
        Long l = this.offsetLimits.get(topicPartition);
        return l != null ? l.longValue() : LongCompanionObject.MAX_VALUE;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChangelogReader changelogReader() {
        return this.changelogReader;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public StateStore getStore(String str) {
        return this.registeredStores.getOrDefault(str, Optional.empty()).orElse(null);
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void flush() {
        ProcessorStateException processorStateException = null;
        if (!this.registeredStores.isEmpty()) {
            this.log.debug("Flushing all stores registered in the state manager");
            for (Map.Entry<String, Optional<StateStore>> entry : this.registeredStores.entrySet()) {
                if (!entry.getValue().isPresent()) {
                    throw new IllegalStateException("Expected " + entry.getKey() + " to have been initialized");
                }
                StateStore stateStore = entry.getValue().get();
                this.log.trace("Flushing store {}", stateStore.name());
                try {
                    stateStore.flush();
                } catch (RuntimeException e) {
                    if (processorStateException == null) {
                        processorStateException = new ProcessorStateException(String.format("%sFailed to flush state store %s", this.logPrefix, stateStore.name()), e);
                    }
                    this.log.error("Failed to flush state store {}: ", stateStore.name(), e);
                }
            }
        }
        if (processorStateException != null) {
            throw processorStateException;
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void close(boolean z) throws ProcessorStateException {
        ProcessorStateException processorStateException = null;
        if (!this.registeredStores.isEmpty()) {
            this.log.debug("Closing its state manager and all the registered state stores");
            for (Map.Entry<String, Optional<StateStore>> entry : this.registeredStores.entrySet()) {
                if (entry.getValue().isPresent()) {
                    StateStore stateStore = entry.getValue().get();
                    this.log.debug("Closing storage engine {}", stateStore.name());
                    try {
                        stateStore.close();
                        this.registeredStores.put(stateStore.name(), Optional.empty());
                    } catch (RuntimeException e) {
                        if (processorStateException == null) {
                            processorStateException = new ProcessorStateException(String.format("%sFailed to close state store %s", this.logPrefix, stateStore.name()), e);
                        }
                        this.log.error("Failed to close state store {}: ", stateStore.name(), e);
                    }
                } else {
                    this.log.info("Skipping to close non-initialized store {}", entry.getKey());
                }
            }
        }
        if (!z && this.eosEnabled) {
            try {
                clearCheckpoints();
            } catch (IOException e2) {
                throw new ProcessorStateException(String.format("%sError while deleting the checkpoint file", this.logPrefix), e2);
            }
        }
        if (processorStateException != null) {
            throw processorStateException;
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Checkpointable
    public void checkpoint(Map<TopicPartition, Long> map) {
        ensureStoresRegistered();
        if (this.checkpointFile == null) {
            this.checkpointFile = new OffsetCheckpoint(new File(this.baseDir, ".checkpoint"));
        }
        updateCheckpointFileCache(map);
        this.log.debug("Writing checkpoint: {}", this.checkpointFileCache);
        try {
            this.checkpointFile.write(this.checkpointFileCache);
        } catch (IOException e) {
            this.log.warn("Failed to write offset checkpoint file to [{}]", this.checkpointFile, e);
        }
    }

    private void updateCheckpointFileCache(Map<TopicPartition, Long> map) {
        Long l;
        Set<TopicPartition> validCheckpointableTopics = validCheckpointableTopics();
        Map<TopicPartition, Long> validCheckpointableOffsets = validCheckpointableOffsets(this.changelogReader.restoredOffsets(), validCheckpointableTopics);
        this.log.trace("Checkpointable offsets updated with restored offsets: {}", this.checkpointFileCache);
        for (TopicPartition topicPartition : validCheckpointableTopics) {
            if (map.containsKey(topicPartition)) {
                this.checkpointFileCache.put(topicPartition, Long.valueOf(map.get(topicPartition).longValue() + 1));
            } else if (this.standbyRestoredOffsets.containsKey(topicPartition)) {
                this.checkpointFileCache.put(topicPartition, this.standbyRestoredOffsets.get(topicPartition));
            } else if (validCheckpointableOffsets.containsKey(topicPartition)) {
                this.checkpointFileCache.put(topicPartition, validCheckpointableOffsets.get(topicPartition));
            } else if (!this.checkpointFileCache.containsKey(topicPartition) && (l = validCheckpointableOffsets(this.initialLoadedCheckpoints, validCheckpointableTopics).get(topicPartition)) != null) {
                this.checkpointFileCache.put(topicPartition, l);
            }
        }
    }

    private int getPartition(String str) {
        TopicPartition topicPartition = this.partitionForTopic.get(str);
        return topicPartition == null ? this.taskId.partition : topicPartition.partition();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerGlobalStateStores(List<StateStore> list) {
        this.log.debug("Register global stores {}", list);
        for (StateStore stateStore : list) {
            this.globalStores.put(stateStore.name(), Optional.of(stateStore));
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public StateStore getGlobalStore(String str) {
        return this.globalStores.getOrDefault(str, Optional.empty()).orElse(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<TopicPartition> changelogPartitions() {
        return Collections.unmodifiableList(this.changelogPartitions);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ensureStoresRegistered() {
        for (Map.Entry<String, Optional<StateStore>> entry : this.registeredStores.entrySet()) {
            if (!entry.getValue().isPresent()) {
                throw new IllegalStateException("store [" + entry.getKey() + "] has not been correctly registered. This is a bug in Kafka Streams.");
            }
        }
    }

    private Set<TopicPartition> validCheckpointableTopics() {
        HashSet hashSet = new HashSet(this.storeToChangelogTopic.size());
        for (Map.Entry<String, String> entry : this.storeToChangelogTopic.entrySet()) {
            String key = entry.getKey();
            if (this.registeredStores.containsKey(key) && this.registeredStores.get(key).isPresent() && this.registeredStores.get(key).get().persistent()) {
                String value = entry.getValue();
                hashSet.add(new TopicPartition(value, getPartition(value)));
            }
        }
        return hashSet;
    }

    private static Map<TopicPartition, Long> validCheckpointableOffsets(Map<TopicPartition, Long> map, Set<TopicPartition> set) {
        HashMap hashMap = new HashMap(map.size());
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            if (set.contains(key)) {
                hashMap.put(key, entry.getValue());
            }
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TopicPartition, Long> standbyRestoredOffsets() {
        return Collections.unmodifiableMap(this.standbyRestoredOffsets);
    }
}
