package org.axonframework.extensions.kafka.eventhandling.tokenstore;

import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.tokenstore.AbstractTokenEntry;
import org.axonframework.eventhandling.tokenstore.GenericTokenEntry;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.eventhandling.tokenstore.UnableToInitializeTokenException;
import org.axonframework.lifecycle.Lifecycle;
import org.axonframework.lifecycle.Phase;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/axon-kafka-4.9.0.jar:org/axonframework/extensions/kafka/eventhandling/tokenstore/KafkaTokenStore.class */
public class KafkaTokenStore implements TokenStore, Lifecycle {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) KafkaTokenStore.class);
    private static final String DEFAULT_TOPIC = "__axon_token_store_updates";
    private static final String TOKEN_STORE_CLIENT_ID = "token_store_client";
    private static final String NOT_FOUND_MSG = "Unable to claim token '%s[%s]', It has not been initialized yet";
    private final String nodeId;
    private final TemporalAmount claimTimeout;
    private final TokenStoreState tokenStoreState;
    private final Serializer serializer;
    private final long readTimeOutMillis;

    /* loaded from: input_file:BOOT-INF/lib/axon-kafka-4.9.0.jar:org/axonframework/extensions/kafka/eventhandling/tokenstore/KafkaTokenStore$Builder.class */
    public static class Builder {
        private Serializer serializer;
        private Map<String, Object> consumerConfiguration;
        private Map<String, Object> producerConfiguration;
        private String topic = KafkaTokenStore.DEFAULT_TOPIC;
        private TemporalAmount claimTimeout = Duration.ofSeconds(10);
        private String nodeId = ManagementFactory.getRuntimeMXBean().getName();
        private Supplier<Executor> executorSupplier = Executors::newSingleThreadExecutor;
        private Consumer<Executor> shutdownAction = executor -> {
            if (executor instanceof ExecutorService) {
                ((ExecutorService) executor).shutdown();
            }
        };
        private Duration readTimeOut = Duration.ofSeconds(5);
        private Duration writeTimeout = Duration.ofSeconds(3);

        public Builder topic(String str) {
            BuilderUtils.assertNonEmpty(str, "The topic may not be null");
            this.topic = str;
            return this;
        }

        public Builder serializer(Serializer serializer) {
            BuilderUtils.assertNonNull(serializer, "Serializer may not be null");
            this.serializer = serializer;
            return this;
        }

        public Builder claimTimeout(TemporalAmount temporalAmount) {
            BuilderUtils.assertNonNull(temporalAmount, "The claim timeout may not be null");
            this.claimTimeout = temporalAmount;
            return this;
        }

        public Builder nodeId(String str) {
            BuilderUtils.assertNonEmpty(str, "The nodeId may not be null or empty");
            this.nodeId = str;
            return this;
        }

        public Builder executor(Executor executor) {
            BuilderUtils.assertNonNull(executor, "The executor may not be null");
            this.executorSupplier = () -> {
                return executor;
            };
            this.shutdownAction = executor2 -> {
            };
            return this;
        }

        public Builder consumerConfiguration(Map<String, Object> map) {
            assertMinimalValidClientConfiguration(map, "The consumer configuration may not be null, and needs to contain a 'bootstrap.servers' value");
            this.consumerConfiguration = setConsumerConfig(map);
            return this;
        }

        public Builder producerConfiguration(Map<String, Object> map) {
            assertMinimalValidClientConfiguration(map, "The consumer configuration may not be null, and needs to contain a 'bootstrap.servers' value");
            this.producerConfiguration = setProducerConfig(map);
            return this;
        }

        public Builder readTimeOut(Duration duration) {
            BuilderUtils.assertNonNull(duration, "The readTimeOut may not be null");
            this.readTimeOut = duration;
            return this;
        }

        public Builder writeTimeout(Duration duration) {
            BuilderUtils.assertNonNull(duration, "The readTimeOut may not be null");
            this.writeTimeout = duration;
            return this;
        }

        public Builder onShutdown(Consumer<Executor> consumer) {
            BuilderUtils.assertNonNull(consumer, "The shutdown action may not be null");
            this.shutdownAction = consumer;
            return this;
        }

        public KafkaTokenStore build() {
            return new KafkaTokenStore(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonEmpty(this.topic, "The topic is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.serializer, "The Serializer is a hard requirement and should be provided");
            BuilderUtils.assertNonEmpty(this.nodeId, "The nodeId is a hard requirement and should be provided");
            assertMinimalValidClientConfiguration(this.consumerConfiguration, "Consumer configuration is a hard requirement and should at least contain a 'bootstrap.servers' value");
            assertMinimalValidClientConfiguration(this.producerConfiguration, "Producer configuration is a hard requirement and should at least contain a 'bootstrap.servers' value");
        }

        private Map<String, Object> setConsumerConfig(Map<String, Object> map) {
            HashMap hashMap = new HashMap(map);
            hashMap.remove("group.id");
            hashMap.put("client.id", KafkaTokenStore.TOKEN_STORE_CLIENT_ID);
            hashMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            hashMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TokenUpdateDeserializer.class);
            return hashMap;
        }

        private Map<String, Object> setProducerConfig(Map<String, Object> map) {
            HashMap hashMap = new HashMap(map);
            hashMap.put("client.id", KafkaTokenStore.TOKEN_STORE_CLIENT_ID);
            hashMap.put(ProducerConfig.LINGER_MS_CONFIG, 0);
            hashMap.put(ProducerConfig.ACKS_CONFIG, "1");
            hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TokenUpdateSerializer.class);
            return hashMap;
        }

        private void assertMinimalValidClientConfiguration(Map<String, Object> map, String str) {
            BuilderUtils.assertNonNull(map, str);
            BuilderUtils.assertNonNull(map.get("bootstrap.servers"), str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/axon-kafka-4.9.0.jar:org/axonframework/extensions/kafka/eventhandling/tokenstore/KafkaTokenStore$FutureWithContext.class */
    public static class FutureWithContext {
        final Future<Boolean> future;
        final String processorName;
        final int segment;

        private FutureWithContext(Future<Boolean> future, String str, int i) {
            this.future = future;
            this.processorName = str;
            this.segment = i;
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    protected KafkaTokenStore(Builder builder) {
        builder.validate();
        this.nodeId = builder.nodeId;
        this.claimTimeout = builder.claimTimeout;
        this.tokenStoreState = new TokenStoreState((Executor) builder.executorSupplier.get(), builder.topic, builder.claimTimeout, builder.consumerConfiguration, builder.producerConfiguration, builder.writeTimeout, builder.shutdownAction);
        this.serializer = builder.serializer;
        this.readTimeOutMillis = builder.readTimeOut.toMillis();
    }

    @Override // org.axonframework.eventhandling.tokenstore.TokenStore
    public void storeToken(@Nullable TrackingToken trackingToken, @Nonnull String str, int i) throws UnableToClaimTokenException {
        Optional<U> map = this.tokenStoreState.getCurrent(str, i).map(this::updatableToken);
        if (!map.isPresent()) {
            throw new UnableToClaimTokenException(String.format(NOT_FOUND_MSG, str, Integer.valueOf(i)));
        }
        GenericTokenEntry genericTokenEntry = new GenericTokenEntry(trackingToken, this.serializer, byte[].class, str, i);
        if (!genericTokenEntry.claim(this.nodeId, this.claimTimeout)) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It is owned by '%s'", str, Integer.valueOf(i), genericTokenEntry.getOwner()));
        }
        send(new TokenUpdate(genericTokenEntry, ((TokenUpdate) map.get()).getSequenceNumber() + 1));
    }

    @Override // org.axonframework.eventhandling.tokenstore.TokenStore
    public TrackingToken fetchToken(@Nonnull String str, int i) throws UnableToClaimTokenException {
        Optional<TokenUpdate> current = this.tokenStoreState.getCurrent(str, i);
        if (!current.isPresent()) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It has not been initialized yet", str, Integer.valueOf(i)));
        }
        AbstractTokenEntry<byte[]> tokenEntry = current.get().toTokenEntry();
        if (!tokenEntry.claim(this.nodeId, this.claimTimeout)) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It is owned by '%s'", str, Integer.valueOf(i), tokenEntry.getOwner()));
        }
        send(new TokenUpdate(tokenEntry, current.get().getSequenceNumber() + 1));
        return tokenEntry.getToken(this.serializer);
    }

    @Override // org.axonframework.eventhandling.tokenstore.TokenStore
    public boolean requiresExplicitSegmentInitialization() {
        return true;
    }

    @Override // org.axonframework.eventhandling.tokenstore.TokenStore
    public void releaseClaim(@Nonnull String str, int i) {
        Optional<U> map = this.tokenStoreState.getCurrent(str, i).map(this::updatableToken);
        if (!map.isPresent()) {
            throw new UnableToClaimTokenException(String.format(NOT_FOUND_MSG, str, Integer.valueOf(i)));
        }
        send(tokenUpdateToReleaseUpdate((TokenUpdate) map.get()));
    }

    @Override // org.axonframework.eventhandling.tokenstore.TokenStore
    public void deleteToken(@Nonnull String str, int i) throws UnableToClaimTokenException {
        Optional<U> map = this.tokenStoreState.getCurrent(str, i).map(this::deletableToken);
        if (!map.isPresent()) {
            throw new UnableToClaimTokenException(String.format(NOT_FOUND_MSG, str, Integer.valueOf(i)));
        }
        delete((TokenUpdate) map.get());
    }

    @Override // org.axonframework.eventhandling.tokenstore.TokenStore
    public void initializeTokenSegments(@Nonnull String str, int i) throws UnableToClaimTokenException {
        initializeTokenSegments(str, i, null);
    }

    @Override // org.axonframework.eventhandling.tokenstore.TokenStore
    public void initializeTokenSegments(@Nonnull String str, int i, @Nullable TrackingToken trackingToken) throws UnableToClaimTokenException {
        int[] fetchSegments = fetchSegments(str);
        if (fetchSegments.length > 0) {
            throw new UnableToClaimTokenException(String.format("Unable to initialize tokens for '%s', already %d segments exist", str, Integer.valueOf(fetchSegments.length)));
        }
        ((List) IntStream.range(0, i).mapToObj(i2 -> {
            return new GenericTokenEntry(trackingToken, this.serializer, byte[].class, str, i2);
        }).map(genericTokenEntry -> {
            return new TokenUpdate(genericTokenEntry, 0L);
        }).map(tokenUpdate -> {
            return new FutureWithContext(this.tokenStoreState.send(tokenUpdate), tokenUpdate.getProcessorName(), tokenUpdate.getSegment());
        }).collect(Collectors.toList())).forEach(this::handleFuture);
    }

    @Override // org.axonframework.eventhandling.tokenstore.TokenStore
    public void initializeSegment(@Nullable TrackingToken trackingToken, @Nonnull String str, int i) throws UnableToInitializeTokenException {
        if (this.tokenStoreState.getCurrent(str, i).isPresent()) {
            throw new UnableToInitializeTokenException(String.format("Unable to initialize token '%s[%s]', one already exist", str, Integer.valueOf(i)));
        }
        send(new TokenUpdate(new GenericTokenEntry(trackingToken, this.serializer, byte[].class, str, i), 0L));
    }

    @Override // org.axonframework.eventhandling.tokenstore.TokenStore
    public int[] fetchSegments(@Nonnull String str) {
        return this.tokenStoreState.fetchSegments(str);
    }

    @Override // org.axonframework.eventhandling.tokenstore.TokenStore
    public List<Segment> fetchAvailableSegments(@Nonnull String str) {
        int[] fetchSegments = fetchSegments(str);
        return (List) this.tokenStoreState.fetchAll(str).stream().filter(this::isAvailable).map(tokenUpdate -> {
            return Segment.computeSegment(tokenUpdate.getSegment(), fetchSegments);
        }).collect(Collectors.toList());
    }

    public void start() {
        this.tokenStoreState.start();
    }

    public void close() {
        this.tokenStoreState.close();
    }

    @Override // org.axonframework.lifecycle.Lifecycle
    public void registerLifecycleHandlers(@Nonnull Lifecycle.LifecycleRegistry lifecycleRegistry) {
        lifecycleRegistry.onStart(Phase.EXTERNAL_CONNECTIONS, this::start);
        lifecycleRegistry.onShutdown(Phase.EXTERNAL_CONNECTIONS, this::close);
    }

    private void send(TokenUpdate tokenUpdate) {
        handleFuture(new FutureWithContext(this.tokenStoreState.send(tokenUpdate), tokenUpdate.getProcessorName(), tokenUpdate.getSegment()));
    }

    private void delete(TokenUpdate tokenUpdate) {
        handleFuture(new FutureWithContext(this.tokenStoreState.send(new TokenUpdate(tokenUpdate, true)), tokenUpdate.getProcessorName(), tokenUpdate.getSegment()));
    }

    private void handleFuture(FutureWithContext futureWithContext) {
        boolean z = false;
        try {
            z = futureWithContext.future.get(this.readTimeOutMillis, TimeUnit.MILLISECONDS).booleanValue();
        } catch (InterruptedException e) {
            logger.warn("interrupted while waiting for send to '{}[{}]' to return", futureWithContext.processorName, Integer.valueOf(futureWithContext.segment), e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            logger.warn("Error sending token '{}[{}]' to token state store", futureWithContext.processorName, Integer.valueOf(futureWithContext.segment), e2);
            throw new UnableToClaimTokenException(String.format("Unable to process update '%s[%s]', error sending token", futureWithContext.processorName, Integer.valueOf(futureWithContext.segment)));
        } catch (TimeoutException e3) {
            throw new UnableToClaimTokenException(String.format("Unable to process update '%s[%s]', timed out writing to store state", futureWithContext.processorName, Integer.valueOf(futureWithContext.segment)));
        }
        if (!z) {
            throw new UnableToClaimTokenException(String.format("Unable to process update '%s[%s]', concurrent write invalidated this one", futureWithContext.processorName, Integer.valueOf(futureWithContext.segment)));
        }
    }

    private boolean isAvailable(TokenUpdate tokenUpdate) {
        try {
            updatableToken(tokenUpdate);
            return true;
        } catch (UnableToClaimTokenException e) {
            logger.debug("token not available", (Throwable) e);
            return false;
        }
    }

    private TokenUpdate updatableToken(TokenUpdate tokenUpdate) {
        if (tokenUpdate.getOwner() == null || tokenUpdate.getOwner().equals(this.nodeId)) {
            return tokenUpdate;
        }
        if (tokenUpdate.getTimestamp().isBefore(AbstractTokenEntry.clock.instant().minus(this.claimTimeout))) {
            return tokenUpdate;
        }
        throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]', not the owner, and claim timeout is not expired yet", tokenUpdate.getProcessorName(), Integer.valueOf(tokenUpdate.getSegment())));
    }

    private TokenUpdate deletableToken(TokenUpdate tokenUpdate) {
        if (this.nodeId.equals(tokenUpdate.getOwner())) {
            return tokenUpdate;
        }
        throw new UnableToClaimTokenException(String.format("Unable to remove token '%s[%s]'. It is not owned by %s", tokenUpdate.getProcessorName(), Integer.valueOf(tokenUpdate.getSegment()), this.nodeId));
    }

    private TokenUpdate tokenUpdateToReleaseUpdate(TokenUpdate tokenUpdate) {
        AbstractTokenEntry<byte[]> tokenEntry = tokenUpdate.toTokenEntry();
        tokenEntry.releaseClaim(tokenUpdate.getOwner());
        return new TokenUpdate(tokenEntry, tokenUpdate.getSequenceNumber() + 1);
    }
}
