package org.axonframework.extensions.kafka.eventhandling.consumer.subscribable;

import com.thoughtworks.xstream.XStream;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.extensions.kafka.eventhandling.consumer.RuntimeErrorHandler;
import org.axonframework.extensions.kafka.eventhandling.consumer.TopicSubscriber;
import org.axonframework.extensions.kafka.eventhandling.consumer.TopicSubscriberBuilder;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.CompactDriver;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/axon-kafka-4.9.0.jar:org/axonframework/extensions/kafka/eventhandling/consumer/subscribable/SubscribableKafkaMessageSource.class */
public class SubscribableKafkaMessageSource<K, V> implements SubscribableMessageSource<EventMessage<?>> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) SubscribableKafkaMessageSource.class);
    private final TopicSubscriber subscriber;
    private final String groupId;
    private final ConsumerFactory<K, V> consumerFactory;
    private final Fetcher<K, V, EventMessage<?>> fetcher;
    private final KafkaMessageConverter<K, V> messageConverter;
    private final boolean autoStart;
    private final int consumerCount;
    private final Set<Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArraySet();
    private final Map<Integer, Registration> fetcherRegistrations = new ConcurrentHashMap();
    private final AtomicBoolean inProgress = new AtomicBoolean(false);

    /* loaded from: input_file:BOOT-INF/lib/axon-kafka-4.9.0.jar:org/axonframework/extensions/kafka/eventhandling/consumer/subscribable/SubscribableKafkaMessageSource$Builder.class */
    public static class Builder<K, V> extends TopicSubscriberBuilder<Builder<K, V>> {
        private String groupId;
        private ConsumerFactory<K, V> consumerFactory;
        private Fetcher<K, V, EventMessage<?>> fetcher;
        private KafkaMessageConverter<K, V> messageConverter;
        private boolean autoStart = false;
        private int consumerCount = 1;
        private Supplier<Serializer> serializer;

        public Builder<K, V> serializer(Serializer serializer) {
            BuilderUtils.assertNonNull(serializer, "The Serializer may not be null");
            this.serializer = () -> {
                return serializer;
            };
            return this;
        }

        public Builder<K, V> groupId(String str) {
            BuilderUtils.assertThat(str, str2 -> {
                return Objects.nonNull(str2) && !"".equals(str2);
            }, "The groupId may not be null or empty");
            this.groupId = str;
            return this;
        }

        public Builder<K, V> consumerFactory(ConsumerFactory<K, V> consumerFactory) {
            BuilderUtils.assertNonNull(consumerFactory, "ConsumerFactory may not be null");
            this.consumerFactory = consumerFactory;
            return this;
        }

        public Builder<K, V> consumerFactory(Map<String, Object> map) {
            this.consumerFactory = new DefaultConsumerFactory(map);
            return this;
        }

        public Builder<K, V> fetcher(Fetcher<K, V, EventMessage<?>> fetcher) {
            BuilderUtils.assertNonNull(fetcher, "Fetcher may not be null");
            this.fetcher = fetcher;
            return this;
        }

        public Builder<K, V> messageConverter(KafkaMessageConverter<K, V> kafkaMessageConverter) {
            BuilderUtils.assertNonNull(kafkaMessageConverter, "MessageConverter may not be null");
            this.messageConverter = kafkaMessageConverter;
            return this;
        }

        public Builder<K, V> autoStart() {
            this.autoStart = true;
            return this;
        }

        public Builder<K, V> consumerCount(int i) {
            BuilderUtils.assertThat(Integer.valueOf(i), num -> {
                return num.intValue() > 0;
            }, "The consumer count must be a positive, none zero number");
            this.consumerCount = i;
            return this;
        }

        public SubscribableKafkaMessageSource<K, V> build() {
            return new SubscribableKafkaMessageSource<>(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.groupId, "The Consumer Group Id is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.consumerFactory, "The ConsumerFactory is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.fetcher, "The Fetcher is a hard requirement and should be provided");
            if (this.serializer == null) {
                SubscribableKafkaMessageSource.logger.warn("The default XStreamSerializer is used, whereas it is strongly recommended to configure the security context of the XStream instance.", (Throwable) new AxonConfigurationException("A default XStreamSerializer is used, without specifying the security context"));
                this.serializer = () -> {
                    return XStreamSerializer.builder().xStream(new XStream(new CompactDriver())).build();
                };
            }
            if (this.messageConverter == null) {
                this.messageConverter = DefaultKafkaMessageConverter.builder().serializer(this.serializer.get()).build();
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.axonframework.extensions.kafka.eventhandling.consumer.TopicSubscriberBuilder
        public Builder<K, V> self() {
            return this;
        }
    }

    protected SubscribableKafkaMessageSource(Builder<K, V> builder) {
        builder.validate();
        this.subscriber = builder.getSubscriber();
        this.groupId = ((Builder) builder).groupId;
        this.consumerFactory = ((Builder) builder).consumerFactory;
        this.fetcher = ((Builder) builder).fetcher;
        this.messageConverter = ((Builder) builder).messageConverter;
        this.autoStart = ((Builder) builder).autoStart;
        this.consumerCount = ((Builder) builder).consumerCount;
    }

    public static <K, V> Builder<K, V> builder() {
        return new Builder<>();
    }

    @Override // org.axonframework.messaging.SubscribableMessageSource
    public Registration subscribe(Consumer<List<? extends EventMessage<?>>> consumer) {
        if (this.eventProcessors.add(consumer)) {
            logger.debug("Event Processor [{}] subscribed successfully", consumer);
        } else {
            logger.info("Event Processor [{}] not added. It was already subscribed", consumer);
        }
        if (this.autoStart) {
            logger.info("Starting event consumption as auto start is enabled");
            start();
        }
        return () -> {
            if (!this.eventProcessors.remove(consumer)) {
                logger.info("Event Processor [{}] not removed. It was already unsubscribed", consumer);
                return false;
            }
            logger.debug("Event Processor [{}] unsubscribed successfully", consumer);
            if (!this.eventProcessors.isEmpty() || !this.autoStart) {
                return true;
            }
            logger.info("Closing event consumption as auto start is enabled");
            close();
            return true;
        };
    }

    public void start() {
        if (this.inProgress.getAndSet(true)) {
            return;
        }
        for (int i = 0; i < this.consumerCount; i++) {
            addConsumer(i);
        }
    }

    private void addConsumer(int i) {
        org.apache.kafka.clients.consumer.Consumer<K, V> createConsumer = this.consumerFactory.createConsumer(this.groupId);
        this.subscriber.subscribeTopics(createConsumer);
        this.fetcherRegistrations.put(Integer.valueOf(i), this.fetcher.poll(createConsumer, consumerRecords -> {
            Stream stream = StreamSupport.stream(consumerRecords.spliterator(), false);
            KafkaMessageConverter<K, V> kafkaMessageConverter = this.messageConverter;
            kafkaMessageConverter.getClass();
            return (List) stream.map(kafkaMessageConverter::readKafkaMessage).filter((v0) -> {
                return v0.isPresent();
            }).map((v0) -> {
                return v0.get();
            }).collect(Collectors.toList());
        }, list -> {
            this.eventProcessors.forEach(consumer -> {
                consumer.accept(list);
            });
        }, restartOnError(i)));
    }

    private RuntimeErrorHandler restartOnError(int i) {
        return runtimeException -> {
            logger.warn("Consumer had a fatal exception, starting a new one", (Throwable) runtimeException);
            addConsumer(i);
        };
    }

    public void close() {
        if (this.fetcherRegistrations.isEmpty()) {
            logger.debug("No Event Processors have been subscribed who's Consumers should be closed");
            return;
        }
        this.fetcherRegistrations.values().forEach((v0) -> {
            v0.close();
        });
        this.fetcherRegistrations.clear();
        this.inProgress.set(false);
    }
}
