package org.axonframework.extensions.kafka.eventhandling.producer;

import com.thoughtworks.xstream.XStream;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.lifecycle.Lifecycle;
import org.axonframework.messaging.EventPublicationFailedException;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.CompactDriver;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/axon-kafka-4.9.0.jar:org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisher.class */
public class KafkaPublisher<K, V> implements Lifecycle {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) KafkaPublisher.class);
    private static final String DEFAULT_TOPIC = "Axon.Events";
    private final ProducerFactory<K, V> producerFactory;
    private final KafkaMessageConverter<K, V> messageConverter;
    private final MessageMonitor<? super EventMessage<?>> messageMonitor;
    private final TopicResolver topicResolver;
    private final long publisherAckTimeout;

    /* loaded from: input_file:BOOT-INF/lib/axon-kafka-4.9.0.jar:org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisher$Builder.class */
    public static class Builder<K, V> {
        private ProducerFactory<K, V> producerFactory;
        private KafkaMessageConverter<K, V> messageConverter;
        private MessageMonitor<? super EventMessage<?>> messageMonitor = NoOpMessageMonitor.instance();
        private TopicResolver topicResolver = eventMessage -> {
            return Optional.of("Axon.Events");
        };
        private long publisherAckTimeout = 1000;
        private Supplier<Serializer> serializer;

        public Builder<K, V> serializer(Serializer serializer) {
            BuilderUtils.assertNonNull(serializer, "The Serializer may not be null");
            this.serializer = () -> {
                return serializer;
            };
            return this;
        }

        public Builder<K, V> producerFactory(ProducerFactory<K, V> producerFactory) {
            BuilderUtils.assertNonNull(producerFactory, "ProducerFactory may not be null");
            this.producerFactory = producerFactory;
            return this;
        }

        public Builder<K, V> messageConverter(KafkaMessageConverter<K, V> kafkaMessageConverter) {
            BuilderUtils.assertNonNull(kafkaMessageConverter, "MessageConverter may not be null");
            this.messageConverter = kafkaMessageConverter;
            return this;
        }

        public Builder<K, V> messageMonitor(MessageMonitor<? super EventMessage<?>> messageMonitor) {
            BuilderUtils.assertNonNull(messageMonitor, "MessageMonitor may not be null");
            this.messageMonitor = messageMonitor;
            return this;
        }

        @Deprecated
        public Builder<K, V> topic(String str) {
            BuilderUtils.assertThat(str, str2 -> {
                return Objects.nonNull(str2) && !"".equals(str2);
            }, "The topic may not be null or empty");
            this.topicResolver = eventMessage -> {
                return Optional.of(str);
            };
            return this;
        }

        public Builder<K, V> topicResolver(TopicResolver topicResolver) {
            BuilderUtils.assertNonNull(topicResolver, "The TopicResolver may not be null");
            this.topicResolver = topicResolver;
            return this;
        }

        public Builder<K, V> publisherAckTimeout(long j) {
            BuilderUtils.assertThat(Long.valueOf(j), l -> {
                return l.longValue() >= 0;
            }, "The publisherAckTimeout should be a positive number or zero");
            this.publisherAckTimeout = j;
            return this;
        }

        public KafkaPublisher<K, V> build() {
            return new KafkaPublisher<>(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.producerFactory, "The ProducerFactory is a hard requirement and should be provided");
            if (this.serializer == null) {
                KafkaPublisher.logger.warn("The default XStreamSerializer is used, whereas it is strongly recommended to configure the security context of the XStream instance.", (Throwable) new AxonConfigurationException("A default XStreamSerializer is used, without specifying the security context"));
                this.serializer = () -> {
                    return XStreamSerializer.builder().xStream(new XStream(new CompactDriver())).build();
                };
            }
            if (this.messageConverter == null) {
                this.messageConverter = DefaultKafkaMessageConverter.builder().serializer(this.serializer.get()).build();
            }
        }
    }

    protected KafkaPublisher(Builder<K, V> builder) {
        builder.validate();
        this.producerFactory = ((Builder) builder).producerFactory;
        this.messageConverter = ((Builder) builder).messageConverter;
        this.messageMonitor = ((Builder) builder).messageMonitor;
        this.topicResolver = ((Builder) builder).topicResolver;
        this.publisherAckTimeout = ((Builder) builder).publisherAckTimeout;
    }

    public static <K, V> Builder<K, V> builder() {
        return new Builder<>();
    }

    public <T extends EventMessage<?>> void send(T t) {
        logger.debug("Starting event producing process for [{}].", t.getPayloadType());
        Optional<String> resolve = this.topicResolver.resolve(t);
        if (!resolve.isPresent()) {
            logger.debug("Skip publishing event for [{}] since topicFunction returned empty.", t.getPayloadType());
            return;
        }
        MessageMonitor.MonitorCallback onMessageIngested = this.messageMonitor.onMessageIngested(t);
        Producer<?, ?> createProducer = this.producerFactory.createProducer();
        ConfirmationMode confirmationMode = this.producerFactory.confirmationMode();
        if (confirmationMode.isTransactional()) {
            tryBeginTxn(createProducer);
        }
        Future<RecordMetadata> send = createProducer.send(this.messageConverter.createKafkaMessage(t, resolve.get()));
        CurrentUnitOfWork.get().onRollback(unitOfWork -> {
            if (confirmationMode.isTransactional()) {
                tryRollback(createProducer);
            }
            tryClose(createProducer);
        });
        if (confirmationMode.isTransactional()) {
            tryCommit(createProducer, onMessageIngested);
        } else if (confirmationMode.isWaitForAck()) {
            waitForPublishAck(send, onMessageIngested);
        }
        tryClose(createProducer);
    }

    private void tryBeginTxn(Producer<?, ?> producer) {
        try {
            producer.beginTransaction();
        } catch (ProducerFencedException e) {
            logger.warn("Unable to begin transaction");
            throw new EventPublicationFailedException("Event publication failed, exception occurred while starting Kafka transaction.", e);
        }
    }

    private void tryCommit(Producer<?, ?> producer, MessageMonitor.MonitorCallback monitorCallback) {
        try {
            producer.commitTransaction();
            monitorCallback.reportSuccess();
        } catch (ProducerFencedException e) {
            logger.warn("Unable to commit transaction");
            monitorCallback.reportFailure(e);
            throw new EventPublicationFailedException("Event publication failed, exception occurred while committing Kafka transaction.", e);
        }
    }

    private void waitForPublishAck(Future<RecordMetadata> future, MessageMonitor.MonitorCallback monitorCallback) {
        try {
            future.get(Math.max(0L, (System.currentTimeMillis() + this.publisherAckTimeout) - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
            monitorCallback.reportSuccess();
        } catch (InterruptedException e) {
            monitorCallback.reportFailure(e);
            logger.warn("Encountered error while waiting for event publication.", (Throwable) e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException | TimeoutException e2) {
            monitorCallback.reportFailure(e2);
            logger.warn("Encountered error while waiting for event publication.");
            throw new EventPublicationFailedException("Event publication failed, exception occurred while waiting for event publication.", e2);
        }
    }

    private void tryRollback(Producer<?, ?> producer) {
        try {
            producer.abortTransaction();
        } catch (Exception e) {
            logger.warn("Unable to abort transaction.", (Throwable) e);
        }
    }

    private void tryClose(Producer<?, ?> producer) {
        try {
            producer.close();
        } catch (Exception e) {
            logger.debug("Unable to close producer.", (Throwable) e);
        }
    }

    public void shutDown() {
        this.producerFactory.shutDown();
    }

    @Override // org.axonframework.lifecycle.Lifecycle
    public void registerLifecycleHandlers(@Nonnull Lifecycle.LifecycleRegistry lifecycleRegistry) {
        lifecycleRegistry.onShutdown(1073741823, this::shutDown);
    }
}
