package de.otto.synapse.endpoint.receiver.kafka;

import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ChannelResponse;
import de.otto.synapse.channel.ShardResponse;
import de.otto.synapse.consumer.MessageDispatcher;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.endpoint.receiver.AbstractMessageLogReceiverEndpoint;
import de.otto.synapse.endpoint.sender.kafka.KafkaMessageSender;
import de.otto.synapse.info.MessageReceiverStatus;
import de.otto.synapse.logging.LogHelper;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

/* loaded from: input_file:de/otto/synapse/endpoint/receiver/kafka/KafkaMessageLogReceiverEndpoint.class */
public class KafkaMessageLogReceiverEndpoint extends AbstractMessageLogReceiverEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageLogReceiverEndpoint.class);
    private static final long KAFKA_CONSUMER_POLLING_DURATION = 1000;
    private static final int LOG_MESSAGE_COUNTER_EVERY_NTH_MESSAGE = 10000;
    private final KafkaConsumer<String, String> kafkaConsumer;
    private final ExecutorService executorService;
    private final ApplicationEventPublisher eventPublisher;
    private final MessageInterceptorRegistry interceptorRegistry;
    final AtomicBoolean stopSignal;

    public KafkaMessageLogReceiverEndpoint(String str, MessageInterceptorRegistry messageInterceptorRegistry, KafkaConsumer<String, String> kafkaConsumer, ExecutorService executorService, ApplicationEventPublisher applicationEventPublisher) {
        super(str, messageInterceptorRegistry, applicationEventPublisher);
        this.stopSignal = new AtomicBoolean(false);
        this.kafkaConsumer = kafkaConsumer;
        this.executorService = executorService;
        this.eventPublisher = applicationEventPublisher;
        this.interceptorRegistry = messageInterceptorRegistry;
    }

    @Nonnull
    public CompletableFuture<ChannelPosition> consumeUntil(@Nonnull ChannelPosition channelPosition, @Nonnull Predicate<ShardResponse> predicate) {
        publishEvent(MessageReceiverStatus.STARTING, "Consuming messages from Kafka.", null);
        ChannelDurationBehindHandler channelDurationBehindHandler = new ChannelDurationBehindHandler(getChannelName(), channelPosition, this.eventPublisher, this.kafkaConsumer);
        ConsumerRebalanceHandler consumerRebalanceHandler = new ConsumerRebalanceHandler(getChannelName(), channelPosition, this.eventPublisher, this.kafkaConsumer);
        String channelName = getChannelName();
        MessageInterceptorRegistry messageInterceptorRegistry = this.interceptorRegistry;
        MessageDispatcher messageDispatcher = getMessageDispatcher();
        Objects.requireNonNull(consumerRebalanceHandler);
        KafkaRecordsConsumer kafkaRecordsConsumer = new KafkaRecordsConsumer(channelName, channelPosition, messageInterceptorRegistry, messageDispatcher, channelDurationBehindHandler, consumerRebalanceHandler::getCurrentPartitions, new KafkaDecoder());
        Set subscription = this.kafkaConsumer.subscription();
        if (!subscription.isEmpty()) {
            if (!subscription.contains(getChannelName())) {
                LOG.error("KafkaConsumer is already subscribed to " + subscription);
                throw new IllegalStateException("Unable to consume channel " + getChannelName() + " using KafkaConsumer that is subscribed to " + subscription);
            }
            this.kafkaConsumer.unsubscribe();
        }
        this.kafkaConsumer.subscribe(Collections.singletonList(getChannelName()), ConsumerRebalanceListeners.of(channelDurationBehindHandler, consumerRebalanceHandler));
        return CompletableFuture.supplyAsync(() -> {
            return processMessages(channelPosition, predicate, consumerRebalanceHandler, kafkaRecordsConsumer);
        }, this.executorService).thenApply(channelPosition2 -> {
            publishEvent(MessageReceiverStatus.FINISHED, "Finished consuming messages from Kafka", null);
            return channelPosition2;
        }).exceptionally(th -> {
            LOG.error("Failed to consume from Kafka stream {}: {}", getChannelName(), th.getMessage());
            publishEvent(MessageReceiverStatus.FAILED, "Failed to consume messages from Kafka: " + th.getMessage(), null);
            stop();
            throw new RuntimeException(th.getMessage(), th);
        });
    }

    private ChannelPosition processMessages(ChannelPosition channelPosition, Predicate<ShardResponse> predicate, ConsumerRebalanceHandler consumerRebalanceHandler, KafkaRecordsConsumer kafkaRecordsConsumer) {
        long currentTimeMillis = System.currentTimeMillis();
        AtomicLong atomicLong = new AtomicLong(0L);
        AtomicLong atomicLong2 = new AtomicLong(System.currentTimeMillis());
        AtomicLong atomicLong3 = new AtomicLong(0L);
        AtomicLong atomicLong4 = new AtomicLong(0L);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ChannelPosition channelPosition2 = channelPosition;
        do {
            try {
                ConsumerRecords<String, String> poll = this.kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLLING_DURATION));
                if (consumerRebalanceHandler.shardsAssignedAndPositioned()) {
                    ChannelResponse apply = kafkaRecordsConsumer.apply(poll);
                    channelPosition2 = apply.getChannelPosition();
                    atomicBoolean.set(apply.getShardResponses().stream().allMatch(predicate));
                    this.kafkaConsumer.commitAsync();
                    int count = poll.count();
                    long addAndGet = atomicLong.addAndGet(count);
                    if ((addAndGet > 0 && addAndGet > atomicLong3.get() + KafkaMessageSender.UPDATE_PARTITION_DELAY) || atomicBoolean.get()) {
                        LOG.info("Read {} messages ({} per sec) from '{}', durationBehind={}, totalMessages={}", new Object[]{Integer.valueOf(count), String.format("%.2f", Double.valueOf(LogHelper.calculateMessagesPerSecond(atomicLong2.getAndSet(System.currentTimeMillis()), addAndGet - atomicLong4.get()))), getChannelName(), apply.getChannelDurationBehind(), Long.valueOf(addAndGet)});
                        if (atomicBoolean.get() || this.stopSignal.get()) {
                            LOG.info("Stop reading of channel={}, stopCondition={}, stopSignal={}, durationBehind={}", new Object[]{getChannelName(), atomicBoolean, Boolean.valueOf(this.stopSignal.get()), apply.getChannelDurationBehind()});
                        }
                        atomicLong3.set(addAndGet - (addAndGet % KafkaMessageSender.UPDATE_PARTITION_DELAY));
                        atomicLong4.set(addAndGet);
                    }
                }
                if (atomicBoolean.get()) {
                    break;
                }
            } catch (WakeupException e) {
                LOG.info("Shutting down Kafka consumer");
            }
        } while (!this.stopSignal.get());
        LOG.info("Read a total of {} messages from '{}', totalMessagesPerSecond={}", new Object[]{Long.valueOf(atomicLong.get()), getChannelName(), String.format("%.2f", Double.valueOf(LogHelper.calculateMessagesPerSecond(currentTimeMillis, atomicLong.get())))});
        return channelPosition2;
    }

    public void stop() {
        LOG.info("Channel {} received stop signal.", getChannelName());
        this.stopSignal.set(true);
        this.kafkaConsumer.wakeup();
    }
}
