package org.axonframework.extensions.kafka.eventhandling.consumer.streamable;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.axonframework.common.Assert;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.consumer.RecordConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/axon-kafka-4.6.0.jar:org/axonframework/extensions/kafka/eventhandling/consumer/streamable/TrackingRecordConverter.class */
public class TrackingRecordConverter<K, V> implements RecordConverter<K, V, KafkaEventMessage> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) TrackingRecordConverter.class);
    private final KafkaMessageConverter<K, V> messageConverter;
    private KafkaTrackingToken currentToken;

    public TrackingRecordConverter(KafkaMessageConverter<K, V> kafkaMessageConverter, KafkaTrackingToken kafkaTrackingToken) {
        this.messageConverter = kafkaMessageConverter;
        this.currentToken = (KafkaTrackingToken) Assert.nonNull(kafkaTrackingToken, () -> {
            return "Token may not be null";
        });
    }

    @Override // org.axonframework.extensions.kafka.eventhandling.consumer.RecordConverter
    public List<KafkaEventMessage> convert(ConsumerRecords<K, V> consumerRecords) {
        ArrayList arrayList = new ArrayList(consumerRecords.count());
        Iterator<ConsumerRecord<K, V>> it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<K, V> next = it.next();
            this.messageConverter.readKafkaMessage(next).ifPresent(eventMessage -> {
                KafkaTrackingToken advancedTo = this.currentToken.advancedTo(next.topic(), next.partition(), next.offset());
                logger.debug("Advancing token from [{}] to [{}]", this.currentToken, advancedTo);
                this.currentToken = advancedTo;
                arrayList.add(KafkaEventMessage.from(eventMessage, next, this.currentToken));
            });
        }
        return arrayList;
    }

    public KafkaTrackingToken currentToken() {
        return this.currentToken;
    }
}
