package org.axonframework.extensions.kafka.eventhandling.consumer.streamable;

import com.thoughtworks.xstream.XStream;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.extensions.kafka.KafkaProperties;
import org.axonframework.extensions.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerSeekUtil;
import org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.EventConsumer;
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.CompactDriver;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/axon-kafka-4.6.0.jar:org/axonframework/extensions/kafka/eventhandling/consumer/streamable/StreamableKafkaMessageSource.class */
public class StreamableKafkaMessageSource<K, V> implements StreamableMessageSource<TrackedEventMessage<?>> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final List<String> topics;
    private final ConsumerFactory<K, V> consumerFactory;
    private final Fetcher<K, V, KafkaEventMessage> fetcher;
    private final KafkaMessageConverter<K, V> messageConverter;
    private final Supplier<Buffer<KafkaEventMessage>> bufferFactory;

    /* loaded from: input_file:BOOT-INF/lib/axon-kafka-4.6.0.jar:org/axonframework/extensions/kafka/eventhandling/consumer/streamable/StreamableKafkaMessageSource$Builder.class */
    public static class Builder<K, V> {
        private ConsumerFactory<K, V> consumerFactory;
        private Fetcher<K, V, KafkaEventMessage> fetcher;
        private KafkaMessageConverter<K, V> messageConverter;
        private Supplier<Serializer> serializer;
        private List<String> topics = Collections.singletonList(KafkaProperties.DEFAULT_TOPIC);
        private Supplier<Buffer<KafkaEventMessage>> bufferFactory = SortedKafkaMessageBuffer::new;

        public Builder<K, V> serializer(Serializer serializer) {
            BuilderUtils.assertNonNull(serializer, "The Serializer may not be null");
            this.serializer = () -> {
                return serializer;
            };
            return this;
        }

        public Builder<K, V> topics(List<String> list) {
            BuilderUtils.assertThat(list, list2 -> {
                return Objects.nonNull(list2) && !list2.isEmpty();
            }, "The topics may not be null or empty");
            this.topics = new ArrayList(list);
            return this;
        }

        public Builder<K, V> addTopic(String str) {
            BuilderUtils.assertThat(str, str2 -> {
                return Objects.nonNull(str2) && !"".equals(str2);
            }, "The topic may not be null or empty");
            this.topics.add(str);
            return this;
        }

        @Deprecated
        public Builder<K, V> groupIdPrefix(String str) {
            StreamableKafkaMessageSource.logger.warn("Using groupIdPrefix in the StreamableKafkaMessageSource.Builder has been deprecated and already effectively does nothing.");
            BuilderUtils.assertThat(str, str2 -> {
                return Objects.nonNull(str2) && !"".equals(str2);
            }, "The groupIdPrefix may not be null or empty");
            return this;
        }

        @Deprecated
        public Builder<K, V> groupIdSuffixFactory(Supplier<String> supplier) {
            StreamableKafkaMessageSource.logger.warn("Using groupIdSuffixFactory in the StreamableKafkaMessageSource.Builder has been deprecated and already effectively does nothing.");
            BuilderUtils.assertNonNull(supplier, "GroupIdSuffixFactory may not be null");
            return this;
        }

        public Builder<K, V> consumerFactory(ConsumerFactory<K, V> consumerFactory) {
            BuilderUtils.assertNonNull(consumerFactory, "ConsumerFactory may not be null");
            this.consumerFactory = consumerFactory;
            return this;
        }

        public Builder<K, V> consumerFactory(Map<String, Object> map) {
            this.consumerFactory = new DefaultConsumerFactory(map);
            return this;
        }

        public Builder<K, V> fetcher(Fetcher<K, V, KafkaEventMessage> fetcher) {
            BuilderUtils.assertNonNull(fetcher, "Fetcher may not be null");
            this.fetcher = fetcher;
            return this;
        }

        public Builder<K, V> messageConverter(KafkaMessageConverter<K, V> kafkaMessageConverter) {
            BuilderUtils.assertNonNull(kafkaMessageConverter, "MessageConverter may not be null");
            this.messageConverter = kafkaMessageConverter;
            return this;
        }

        public Builder<K, V> bufferFactory(Supplier<Buffer<KafkaEventMessage>> supplier) {
            BuilderUtils.assertNonNull(supplier, "Buffer factory may not be null");
            this.bufferFactory = supplier;
            return this;
        }

        public StreamableKafkaMessageSource<K, V> build() {
            return new StreamableKafkaMessageSource<>(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.consumerFactory, "The ConsumerFactory is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.fetcher, "The Fetcher is a hard requirement and should be provided");
            if (this.serializer == null) {
                StreamableKafkaMessageSource.logger.warn("The default XStreamSerializer is used, whereas it is strongly recommended to configure the security context of the XStream instance.", (Throwable) new AxonConfigurationException("A default XStreamSerializer is used, without specifying the security context"));
                this.serializer = () -> {
                    return XStreamSerializer.builder().xStream(new XStream(new CompactDriver())).build();
                };
            }
            if (this.messageConverter == null) {
                this.messageConverter = DefaultKafkaMessageConverter.builder().serializer(this.serializer.get()).build();
            }
        }
    }

    protected StreamableKafkaMessageSource(Builder<K, V> builder) {
        builder.validate();
        this.topics = Collections.unmodifiableList(((Builder) builder).topics);
        this.consumerFactory = ((Builder) builder).consumerFactory;
        this.fetcher = ((Builder) builder).fetcher;
        this.messageConverter = ((Builder) builder).messageConverter;
        this.bufferFactory = ((Builder) builder).bufferFactory;
    }

    public static <K, V> Builder<K, V> builder() {
        return new Builder<>();
    }

    @Override // org.axonframework.messaging.StreamableMessageSource
    /* renamed from: openStream */
    public BlockingStream<TrackedEventMessage<?>> openStream2(TrackingToken trackingToken) {
        TrackingRecordConverter trackingRecordConverter = new TrackingRecordConverter(this.messageConverter, KafkaTrackingToken.from(trackingToken));
        logger.debug("Will start consuming from topics [{}]", this.topics);
        Consumer<K, V> createConsumer = this.consumerFactory.createConsumer(null);
        trackingRecordConverter.getClass();
        ConsumerSeekUtil.seekToCurrentPositions(createConsumer, trackingRecordConverter::currentToken, this.topics);
        Buffer<KafkaEventMessage> buffer = this.bufferFactory.get();
        Fetcher<K, V, KafkaEventMessage> fetcher = this.fetcher;
        buffer.getClass();
        EventConsumer<KafkaEventMessage> eventConsumer = (v1) -> {
            r3.putAll(v1);
        };
        buffer.getClass();
        return new KafkaMessageStream(buffer, fetcher.poll(createConsumer, trackingRecordConverter, eventConsumer, buffer::setException));
    }
}
