package org.citrusframework.kafka.endpoint;

import java.time.Duration;
import java.util.Arrays;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.citrusframework.context.TestContext;
import org.citrusframework.exceptions.MessageTimeoutException;
import org.citrusframework.message.Message;
import org.citrusframework.messaging.AbstractMessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/citrusframework/kafka/endpoint/KafkaMessageSingleConsumer.class */
class KafkaMessageSingleConsumer extends AbstractMessageConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageSingleConsumer.class);
    private final org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumer;

    /* loaded from: input_file:org/citrusframework/kafka/endpoint/KafkaMessageSingleConsumer$KafkaMessageSingleConsumerBuilder.class */
    public static class KafkaMessageSingleConsumerBuilder {
        private KafkaEndpointConfiguration endpointConfiguration;
        private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumer;

        public KafkaMessageSingleConsumerBuilder endpointConfiguration(KafkaEndpointConfiguration kafkaEndpointConfiguration) {
            this.endpointConfiguration = kafkaEndpointConfiguration;
            return this;
        }

        public KafkaMessageSingleConsumerBuilder consumer(org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> kafkaConsumer) {
            this.consumer = kafkaConsumer;
            return this;
        }

        public KafkaMessageSingleConsumer build() {
            return new KafkaMessageSingleConsumer(this.endpointConfiguration, this.consumer);
        }
    }

    public static KafkaMessageSingleConsumerBuilder builder() {
        return new KafkaMessageSingleConsumerBuilder();
    }

    private KafkaMessageSingleConsumer(KafkaEndpointConfiguration kafkaEndpointConfiguration, org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> kafkaConsumer) {
        super(KafkaMessageSingleConsumer.class.getSimpleName(), kafkaEndpointConfiguration);
        this.consumer = kafkaConsumer;
    }

    public Message receive(TestContext testContext, long j) {
        String resolveTopic = KafkaMessageConsumerUtils.resolveTopic(m11getEndpointConfiguration(), testContext);
        logger.debug("Receiving Kafka message on topic: '{}'", resolveTopic);
        if (this.consumer.subscription() == null || this.consumer.subscription().isEmpty()) {
            this.consumer.subscribe(Arrays.stream(resolveTopic.split(",")).toList());
        }
        ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(j));
        if (poll.isEmpty()) {
            throw new MessageTimeoutException(j, resolveTopic);
        }
        Message parseConsumerRecordsToMessage = KafkaMessageConsumerUtils.parseConsumerRecordsToMessage(StreamSupport.stream(poll.spliterator(), false).toList(), m11getEndpointConfiguration(), testContext);
        this.consumer.commitSync(Duration.ofMillis(m11getEndpointConfiguration().getTimeout()));
        if (logger.isDebugEnabled()) {
            logger.info("Received Kafka message on topic '{}': {}", resolveTopic, parseConsumerRecordsToMessage);
        } else {
            logger.info("Received Kafka message on topic '{}'", resolveTopic);
        }
        return parseConsumerRecordsToMessage;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: getEndpointConfiguration, reason: merged with bridge method [inline-methods] */
    public KafkaEndpointConfiguration m11getEndpointConfiguration() {
        return super.getEndpointConfiguration();
    }
}
