package de.otto.synapse.endpoint.receiver.kafka;

import de.otto.synapse.channel.ChannelDurationBehind;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.info.MessageReceiverNotification;
import de.otto.synapse.info.MessageReceiverStatus;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

/* loaded from: input_file:de/otto/synapse/endpoint/receiver/kafka/ChannelDurationBehindHandler.class */
class ChannelDurationBehindHandler implements ConsumerRebalanceListener {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelDurationBehindHandler.class);
    private final List<TopicPartition> partitions;
    private final AtomicReference<ChannelDurationBehind> channelDurationBehind;
    private final String channelName;
    private final ChannelPosition startFrom;
    private final ApplicationEventPublisher eventPublisher;
    private final KafkaConsumer<String, String> kafkaConsumer;
    private final Clock clock;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelDurationBehindHandler(String str, ChannelPosition channelPosition, ApplicationEventPublisher applicationEventPublisher, KafkaConsumer<String, String> kafkaConsumer) {
        this(str, channelPosition, applicationEventPublisher, Clock.systemDefaultZone(), kafkaConsumer);
    }

    ChannelDurationBehindHandler(String str, ChannelPosition channelPosition, ApplicationEventPublisher applicationEventPublisher, Clock clock, KafkaConsumer<String, String> kafkaConsumer) {
        this.partitions = new CopyOnWriteArrayList();
        this.channelDurationBehind = new AtomicReference<>();
        this.channelName = str;
        this.startFrom = channelPosition;
        this.eventPublisher = applicationEventPublisher;
        this.kafkaConsumer = kafkaConsumer;
        this.channelDurationBehind.set(ChannelDurationBehind.unknown());
        this.clock = clock;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        LOG.info("Revoked " + collection + " Kafka partitions: " + collection);
        this.partitions.removeAll(collection);
        collection.forEach(topicPartition -> {
            String str = "" + topicPartition.partition();
            this.channelDurationBehind.getAndUpdate(channelDurationBehind -> {
                return ChannelDurationBehind.copyOf(channelDurationBehind).without(str).build();
            });
        });
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        this.partitions.addAll(collection);
        this.channelDurationBehind.updateAndGet(channelDurationBehind -> {
            return updateOnPartitionChanged(this.startFrom, collection, channelDurationBehind);
        });
    }

    public void update(TopicPartition topicPartition, long j, Instant instant) {
        Duration between = ((Long) this.kafkaConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition)).longValue() - 1 > j ? Duration.between(instant, this.clock.instant()) : Duration.ZERO;
        Duration duration = between;
        this.channelDurationBehind.updateAndGet(channelDurationBehind -> {
            return ChannelDurationBehind.copyOf(channelDurationBehind).with("" + topicPartition.partition(), duration).build();
        });
        LOG.debug("Read from '{}:{}', durationBehind={}", new Object[]{this.channelName, Integer.valueOf(topicPartition.partition()), between});
        if (this.eventPublisher != null) {
            this.eventPublisher.publishEvent(MessageReceiverNotification.builder().withChannelName(this.channelName).withChannelDurationBehind(this.channelDurationBehind.get()).withStatus(MessageReceiverStatus.RUNNING).withMessage("Reading from Kafka stream.").build());
        }
    }

    private ChannelDurationBehind updateOnPartitionChanged(ChannelPosition channelPosition, Collection<TopicPartition> collection, ChannelDurationBehind channelDurationBehind) {
        Map endOffsets = this.kafkaConsumer.endOffsets(collection);
        ChannelDurationBehind.Builder copyOf = ChannelDurationBehind.copyOf(channelDurationBehind);
        endOffsets.forEach((topicPartition, l) -> {
            String str = "" + topicPartition.partition();
            if (startPosToLong(channelPosition.shard(str).position()) >= l.longValue()) {
                copyOf.with(str, Duration.ZERO);
            } else {
                copyOf.with(str, Duration.ofMillis(Long.MAX_VALUE));
            }
        });
        return copyOf.build();
    }

    private long startPosToLong(String str) {
        if (str == null || str.equals("")) {
            return 0L;
        }
        return Long.parseLong(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelDurationBehind getChannelDurationBehind() {
        return this.channelDurationBehind.get();
    }
}
