package org.axonframework.extensions.kafka.eventhandling.tokenstore;

import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.axonframework.common.AxonException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/axon-kafka-4.9.0.jar:org/axonframework/extensions/kafka/eventhandling/tokenstore/TokenStoreState.class */
class TokenStoreState {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) TokenStoreState.class);
    private static final String SEQUENCE_ERROR_FORMAT = "%d is not after %d for processor %s with segment %d";
    private static final String KEY_FORMAT = "%s::%d";
    private final Map<String, Map<Integer, TokenUpdate>> state = new ConcurrentHashMap();
    private final Map<UUID, CompletableFuture<Boolean>> writeResult = new ConcurrentHashMap();
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final AtomicReference<CompletableFuture<Boolean>> isReady = new AtomicReference<>(new CompletableFuture());
    private final AtomicReference<Producer<String, TokenUpdate>> producer = new AtomicReference<>(null);
    private final Executor executor;
    private final String topic;
    private final TemporalAmount claimTimeout;
    private final Map<String, Object> consumerConfiguration;
    private final Map<String, Object> producerConfiguration;
    private final long writeTimeOutMillis;
    private final Consumer<Executor> shutdownAction;

    /* loaded from: input_file:BOOT-INF/lib/axon-kafka-4.9.0.jar:org/axonframework/extensions/kafka/eventhandling/tokenstore/TokenStoreState$TokenStoreUpdateException.class */
    private static class TokenStoreUpdateException extends AxonException {
        private TokenStoreUpdateException(String str) {
            super(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TokenStoreState(Executor executor, String str, TemporalAmount temporalAmount, Map<String, Object> map, Map<String, Object> map2, Duration duration, Consumer<Executor> consumer) {
        this.executor = executor;
        this.topic = str;
        this.claimTimeout = temporalAmount;
        this.consumerConfiguration = map;
        this.producerConfiguration = map2;
        this.writeTimeOutMillis = duration.toMillis();
        this.shutdownAction = consumer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<Boolean> send(TokenUpdate tokenUpdate) {
        String format = String.format(KEY_FORMAT, tokenUpdate.getProcessorName(), Integer.valueOf(tokenUpdate.getSegment()));
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        this.writeResult.put(tokenUpdate.getId(), completableFuture);
        try {
            this.producer.updateAndGet(producer -> {
                return producer == null ? new KafkaProducer(this.producerConfiguration) : producer;
            }).send(new ProducerRecord<>(this.topic, format, tokenUpdate)).get(this.writeTimeOutMillis, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            this.writeResult.remove(tokenUpdate.getId());
            completableFuture.complete(false);
            Thread.currentThread().interrupt();
        } catch (ExecutionException | TimeoutException e2) {
            logger.warn("error sending token update '{}[{}]' to Kafka", tokenUpdate.getProcessorName(), Integer.valueOf(tokenUpdate.getSegment()), e2);
            this.writeResult.remove(tokenUpdate.getId());
            completableFuture.complete(false);
        }
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<TokenUpdate> getCurrent(String str, int i) {
        waitTillReady();
        return Optional.ofNullable(this.state.get(str)).map(map -> {
            return (TokenUpdate) map.get(Integer.valueOf(i));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int[] fetchSegments(String str) {
        waitTillReady();
        return (int[]) Optional.ofNullable(this.state.get(str)).map(this::toPrimitiveIntArray).orElse(new int[0]);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<TokenUpdate> fetchAll(String str) {
        waitTillReady();
        return (Collection) Optional.ofNullable(this.state.get(str)).map((v0) -> {
            return v0.values();
        }).orElse(Collections.emptyList());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        if (this.isRunning.compareAndSet(false, true)) {
            this.executor.execute(this::startConsumer);
            this.producer.set(new KafkaProducer(this.producerConfiguration));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.isRunning.set(false);
        this.producer.updateAndGet(producer -> {
            if (producer == null) {
                return null;
            }
            producer.close();
            return null;
        });
        this.shutdownAction.accept(this.executor);
    }

    private int[] toPrimitiveIntArray(Map<Integer, TokenUpdate> map) {
        Iterator<Integer> it = map.keySet().iterator();
        int[] iArr = new int[map.size()];
        for (int i = 0; i < iArr.length; i++) {
            iArr[i] = it.next().intValue();
        }
        return iArr;
    }

    private void waitTillReady() {
        try {
            if (!this.isReady.get().get().booleanValue()) {
                waitTillReady();
            }
        } catch (InterruptedException e) {
            logger.warn("interrupted while waiting until token store state was ready", (Throwable) e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            logger.warn("Error waiting till token store state was ready", (Throwable) e2);
            throw new TokenStoreInitializationException("Token store is not ready yet");
        }
    }

    private void startConsumer() {
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(this.consumerConfiguration);
            Throwable th = null;
            try {
                Map<String, List<PartitionInfo>> listTopics = kafkaConsumer.listTopics();
                if (!listTopics.containsKey(this.topic)) {
                    createTopic();
                    listTopics = kafkaConsumer.listTopics();
                }
                List list = (List) listTopics.get(this.topic).stream().map(partitionInfo -> {
                    return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                }).collect(Collectors.toList());
                kafkaConsumer.assign(list);
                Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(list);
                while (this.isRunning.get()) {
                    ConsumerRecords<K, V> poll = kafkaConsumer.poll(Duration.ofMillis(100L));
                    logger.debug("received: {} records", Integer.valueOf(poll.count()));
                    poll.forEach(this::update);
                    if (!endOffsets.isEmpty()) {
                        ArrayList arrayList = new ArrayList();
                        endOffsets.forEach((topicPartition, l) -> {
                            if (kafkaConsumer.position(topicPartition) >= l.longValue()) {
                                arrayList.add(topicPartition);
                            }
                        });
                        endOffsets.getClass();
                        arrayList.forEach((v1) -> {
                            r1.remove(v1);
                        });
                        if (endOffsets.isEmpty()) {
                            this.isReady.get().complete(true);
                        }
                    }
                }
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            this.isReady.getAndUpdate(completableFuture -> {
                completableFuture.complete(false);
                return new CompletableFuture();
            });
            logger.warn("Error consuming to update Kafka token store", (Throwable) e);
            if (this.isRunning.get()) {
                logger.info("Restarting consumer, the model is kept, so errors updating state are expected");
                this.executor.execute(this::startConsumer);
            }
        }
    }

    private void createTopic() {
        try {
            AdminClient create = AdminClient.create(this.consumerConfiguration);
            Throwable th = null;
            try {
                NewTopic newTopic = new NewTopic(this.topic, 1, (short) Math.min(3, create.describeCluster().nodes().get().size()));
                HashMap hashMap = new HashMap();
                hashMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact");
                hashMap.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, String.valueOf(2 * Duration.from(this.claimTimeout).getSeconds()));
                hashMap.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, String.valueOf(8 * Duration.from(this.claimTimeout).getSeconds()));
                newTopic.configs(hashMap);
                create.createTopics(Collections.singletonList(newTopic));
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            logger.warn("error creating topic for Kafka token store", e);
            Thread.currentThread().interrupt();
        }
    }

    private void update(ConsumerRecord<String, TokenUpdate> consumerRecord) {
        TokenUpdate tokenUpdate;
        boolean updateDeletion;
        if (consumerRecord.value() != null) {
            tokenUpdate = consumerRecord.value();
            updateDeletion = updateAddition(tokenUpdate);
        } else {
            tokenUpdate = new TokenUpdate(consumerRecord.headers(), (byte[]) null);
            updateDeletion = updateDeletion(tokenUpdate);
        }
        boolean z = updateDeletion;
        this.writeResult.computeIfPresent(tokenUpdate.getId(), (uuid, completableFuture) -> {
            completableFuture.complete(Boolean.valueOf(z));
            return null;
        });
    }

    private boolean updateAddition(TokenUpdate tokenUpdate) {
        try {
            updateState(tokenUpdate);
            return true;
        } catch (TokenStoreUpdateException e) {
            logger.info("failed to update state for '{}[{}]'", tokenUpdate.getProcessorName(), Integer.valueOf(tokenUpdate.getSegment()), e);
            return false;
        }
    }

    private boolean updateDeletion(TokenUpdate tokenUpdate) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.state.computeIfPresent(tokenUpdate.getProcessorName(), (str, map) -> {
            atomicBoolean.set(map.remove(Integer.valueOf(tokenUpdate.getSegment())) != null);
            return map;
        });
        return atomicBoolean.get();
    }

    private void updateState(TokenUpdate tokenUpdate) {
        this.state.computeIfAbsent(tokenUpdate.getProcessorName(), str -> {
            return new ConcurrentHashMap();
        }).merge(Integer.valueOf(tokenUpdate.getSegment()), tokenUpdate, this::mergeFunction);
    }

    private TokenUpdate mergeFunction(TokenUpdate tokenUpdate, TokenUpdate tokenUpdate2) {
        if (tokenUpdate2.getSequenceNumber() > tokenUpdate.getSequenceNumber()) {
            return tokenUpdate2;
        }
        throw new TokenStoreUpdateException(String.format(SEQUENCE_ERROR_FORMAT, Long.valueOf(tokenUpdate2.getSequenceNumber()), Long.valueOf(tokenUpdate.getSequenceNumber()), tokenUpdate2.getProcessorName(), Integer.valueOf(tokenUpdate2.getSegment())));
    }
}
