package org.citrusframework.kafka.endpoint;

import jakarta.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.citrusframework.context.TestContext;
import org.citrusframework.exceptions.CitrusRuntimeException;
import org.citrusframework.exceptions.MessageTimeoutException;
import org.citrusframework.kafka.endpoint.selector.KafkaMessageSelector;
import org.citrusframework.message.Message;
import org.citrusframework.messaging.AbstractSelectiveMessageConsumer;
import org.citrusframework.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/citrusframework/kafka/endpoint/KafkaMessageFilteringConsumer.class */
public class KafkaMessageFilteringConsumer extends AbstractSelectiveMessageConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageFilteringConsumer.class);
    private final org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumer;
    private KafkaMessageFilter kafkaMessageFilter;

    /* loaded from: input_file:org/citrusframework/kafka/endpoint/KafkaMessageFilteringConsumer$KafkaMessageFilteringConsumerBuilder.class */
    public static class KafkaMessageFilteringConsumerBuilder {
        private KafkaEndpointConfiguration endpointConfiguration;
        private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumer;
        private Duration eventLookbackWindow;
        private Duration pollTimeout;
        private KafkaMessageSelector kafkaMessageSelector;

        public KafkaMessageFilteringConsumerBuilder endpointConfiguration(KafkaEndpointConfiguration kafkaEndpointConfiguration) {
            this.endpointConfiguration = kafkaEndpointConfiguration;
            return this;
        }

        public KafkaMessageFilteringConsumerBuilder consumer(org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> kafkaConsumer) {
            this.consumer = kafkaConsumer;
            return this;
        }

        public KafkaMessageFilteringConsumerBuilder eventLookbackWindow(Duration duration) {
            this.eventLookbackWindow = duration;
            return this;
        }

        public KafkaMessageFilteringConsumerBuilder pollTimeout(Duration duration) {
            this.pollTimeout = duration;
            return this;
        }

        public KafkaMessageFilteringConsumerBuilder kafkaMessageSelector(KafkaMessageSelector kafkaMessageSelector) {
            this.kafkaMessageSelector = kafkaMessageSelector;
            return this;
        }

        public KafkaMessageFilteringConsumer build() {
            return new KafkaMessageFilteringConsumer(this.endpointConfiguration, this.consumer, this.eventLookbackWindow, this.pollTimeout, this.kafkaMessageSelector);
        }
    }

    public static KafkaMessageFilteringConsumerBuilder builder() {
        return new KafkaMessageFilteringConsumerBuilder();
    }

    private KafkaMessageFilteringConsumer(KafkaEndpointConfiguration kafkaEndpointConfiguration, org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> kafkaConsumer, @Nullable Duration duration, @Nullable Duration duration2, @Nullable KafkaMessageSelector kafkaMessageSelector) {
        super(KafkaMessageSingleConsumer.class.getSimpleName(), kafkaEndpointConfiguration);
        this.consumer = kafkaConsumer;
        if (Objects.nonNull(duration) || Objects.nonNull(kafkaMessageSelector) || Objects.nonNull(duration2)) {
            this.kafkaMessageFilter = KafkaMessageFilter.kafkaMessageFilter().eventLookbackWindow(duration).kafkaMessageSelector(kafkaMessageSelector).pollTimeout(duration2).buildFilter();
        }
    }

    public org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> getConsumer() {
        return this.consumer;
    }

    public KafkaMessageFilter getKafkaMessageFilter() {
        return this.kafkaMessageFilter;
    }

    public Message receive(String str, TestContext testContext, long j) {
        if (StringUtils.isEmpty(str) && Objects.isNull(this.kafkaMessageFilter)) {
            throw new CitrusRuntimeException("Cannot invoke filtering kafka message consumer without selectors");
        }
        if (StringUtils.hasText(str)) {
            this.kafkaMessageFilter = KafkaMessageFilter.kafkaMessageFilter(str);
        }
        this.kafkaMessageFilter.sanitize();
        String resolveTopic = KafkaMessageConsumerUtils.resolveTopic(m9getEndpointConfiguration(), testContext);
        logger.debug("Receiving Kafka message on topic '{}' using selector: {}", resolveTopic, this.kafkaMessageFilter);
        if (!this.consumer.subscription().isEmpty()) {
            logger.warn("Cancelling active subscriptions of consumer before looking for Kafka events, because subscription to topics, partitions and pattern are mutually exclusive");
            this.consumer.unsubscribe();
        }
        List<ConsumerRecord<Object, Object>> findMessageWithTimeout = findMessageWithTimeout(resolveTopic, j);
        if (findMessageWithTimeout.isEmpty()) {
            throw new CitrusRuntimeException("Failed to resolve Kafka message using selector: " + str);
        }
        Message parseConsumerRecordsToMessage = KafkaMessageConsumerUtils.parseConsumerRecordsToMessage(findMessageWithTimeout, m9getEndpointConfiguration(), testContext);
        if (logger.isDebugEnabled()) {
            logger.info("Received Kafka message on topic '{}': {}", resolveTopic, parseConsumerRecordsToMessage);
        } else {
            logger.info("Received Kafka message on topic '{}'", resolveTopic);
        }
        return parseConsumerRecordsToMessage;
    }

    private List<ConsumerRecord<Object, Object>> findMessageWithTimeout(String str, long j) {
        logger.trace("Applied timeout is {} ms", Long.valueOf(j));
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        Future submit = newSingleThreadExecutor.submit(() -> {
            return findMessagesSatisfyingMatcher(str);
        });
        try {
            try {
                List<ConsumerRecord<Object, Object>> list = (List) submit.get(j, TimeUnit.MILLISECONDS);
                this.consumer.unsubscribe();
                newSingleThreadExecutor.shutdownNow();
                return list;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new CitrusRuntimeException("Thread was interrupted while waiting for Kafka message", e);
            } catch (ExecutionException e2) {
                throw new CitrusRuntimeException(String.format("Failed to receive message on Kafka topic '%s'", str), e2);
            } catch (TimeoutException e3) {
                logger.error("Failed to receive message on  Kafka topic '{}': {}", str, ExceptionUtils.getRootCause(e3).getMessage());
                submit.cancel(true);
                throw new MessageTimeoutException(j, str, e3);
            }
        } catch (Throwable th) {
            this.consumer.unsubscribe();
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: getEndpointConfiguration, reason: merged with bridge method [inline-methods] */
    public KafkaEndpointConfiguration m9getEndpointConfiguration() {
        return super.getEndpointConfiguration();
    }

    private List<ConsumerRecord<Object, Object>> findMessagesSatisfyingMatcher(String str) {
        List<TopicPartition> list = this.consumer.partitionsFor(str).stream().map(partitionInfo -> {
            return new TopicPartition(str, partitionInfo.partition());
        }).toList();
        this.consumer.assign(list);
        offsetConsumerOnTopicByLookbackWindow(list);
        Instant now = Instant.now();
        Instant minus = now.minus((TemporalAmount) this.kafkaMessageFilter.getEventLookbackWindow());
        ArrayList arrayList = new ArrayList();
        while (true) {
            ConsumerRecords poll = this.consumer.poll(this.kafkaMessageFilter.getPollTimeout());
            if (poll.isEmpty()) {
                return arrayList;
            }
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                ConsumerRecord<Object, Object> consumerRecord = (ConsumerRecord) it.next();
                if (isConsumerRecordNewerThanEndTime(consumerRecord, now)) {
                    return arrayList;
                }
                if (!isConsumerRecordOlderThanStartTime(consumerRecord, minus) && this.kafkaMessageFilter.getKafkaMessageSelector().matches(consumerRecord)) {
                    arrayList.add(consumerRecord);
                }
            }
        }
    }

    private void offsetConsumerOnTopicByLookbackWindow(List<TopicPartition> list) {
        Map offsetsForTimes = this.consumer.offsetsForTimes((Map) list.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
            return Long.valueOf(Instant.now().minusMillis(this.kafkaMessageFilter.getEventLookbackWindow().toMillis()).toEpochMilli());
        })));
        logger.trace("Applying new offsets: {}", offsetsForTimes);
        offsetsForTimes.forEach((topicPartition2, offsetAndTimestamp) -> {
            if (Objects.nonNull(offsetAndTimestamp)) {
                this.consumer.seek(topicPartition2, offsetAndTimestamp.offset());
            } else {
                this.consumer.seekToEnd(Collections.singletonList(topicPartition2));
            }
        });
    }

    private static boolean isConsumerRecordNewerThanEndTime(ConsumerRecord<Object, Object> consumerRecord, Instant instant) {
        return consumerRecord.timestamp() > instant.toEpochMilli();
    }

    private static boolean isConsumerRecordOlderThanStartTime(ConsumerRecord<Object, Object> consumerRecord, Instant instant) {
        return consumerRecord.timestamp() < instant.toEpochMilli();
    }
}
