package de.otto.synapse.endpoint.receiver.kafka;

import com.google.common.collect.Sets;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.info.MessageReceiverNotification;
import de.otto.synapse.info.MessageReceiverStatus;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:de/otto/synapse/endpoint/receiver/kafka/ConsumerRebalanceHandler.class */
public class ConsumerRebalanceHandler implements ConsumerRebalanceListener {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerRebalanceHandler.class);
    private final String channelName;
    private final ChannelPosition channelPosition;
    private final ApplicationEventPublisher eventPublisher;
    private final KafkaConsumer<String, String> kafkaConsumer;
    private final Set<String> currentPartitions = Sets.newConcurrentHashSet();
    private final AtomicBoolean shardsAssignedAndPositioned = new AtomicBoolean(false);

    /* renamed from: de.otto.synapse.endpoint.receiver.kafka.ConsumerRebalanceHandler$1, reason: invalid class name */
    /* loaded from: input_file:de/otto/synapse/endpoint/receiver/kafka/ConsumerRebalanceHandler$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$de$otto$synapse$channel$StartFrom = new int[StartFrom.values().length];

        static {
            try {
                $SwitchMap$de$otto$synapse$channel$StartFrom[StartFrom.POSITION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$de$otto$synapse$channel$StartFrom[StartFrom.AT_POSITION.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$de$otto$synapse$channel$StartFrom[StartFrom.HORIZON.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$de$otto$synapse$channel$StartFrom[StartFrom.TIMESTAMP.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerRebalanceHandler(String str, ChannelPosition channelPosition, ApplicationEventPublisher applicationEventPublisher, KafkaConsumer<String, String> kafkaConsumer) {
        this.channelName = str;
        this.channelPosition = channelPosition;
        this.eventPublisher = applicationEventPublisher;
        this.kafkaConsumer = kafkaConsumer;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        LOG.info("Revoked " + collection + " Kafka partitions: " + collection);
        collection.forEach(topicPartition -> {
            this.currentPartitions.remove("" + topicPartition.partition());
        });
        this.shardsAssignedAndPositioned.set(!this.currentPartitions.isEmpty());
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        LOG.info("Assigned " + collection + " Kafka partitions: " + collection);
        collection.forEach(topicPartition -> {
            this.currentPartitions.add("" + topicPartition.partition());
        });
        for (TopicPartition topicPartition2 : collection) {
            String str = "" + topicPartition2.partition();
            ShardPosition shard = this.channelPosition.shard(str);
            TopicPartition topicPartition3 = new TopicPartition(this.channelName, topicPartition2.partition());
            switch (AnonymousClass1.$SwitchMap$de$otto$synapse$channel$StartFrom[shard.startFrom().ordinal()]) {
                case 1:
                    this.kafkaConsumer.seek(topicPartition3, Integer.parseInt(shard.position()) + 1);
                    break;
                case 2:
                    this.kafkaConsumer.seek(topicPartition3, Integer.parseInt(shard.position()));
                    break;
                case 3:
                    this.kafkaConsumer.seekToBeginning(Collections.singletonList(topicPartition3));
                    break;
                case 4:
                    this.kafkaConsumer.seek(topicPartition3, ((OffsetAndTimestamp) this.kafkaConsumer.offsetsForTimes(Collections.singletonMap(topicPartition3, Long.valueOf(shard.timestamp().toEpochMilli()))).get(topicPartition3)).offset());
                    break;
            }
            LOG.info("Reading from channel={}, shard={}, position={}", new Object[]{this.channelName, str, shard});
        }
        this.shardsAssignedAndPositioned.set(true);
        if (this.eventPublisher != null) {
            this.eventPublisher.publishEvent(MessageReceiverNotification.builder().withChannelName(this.channelName).withStatus(MessageReceiverStatus.STARTED).withMessage("Received shards from Kafka.").build());
        }
    }

    public void onPartitionsLost(Collection<TopicPartition> collection) {
        LOG.warn("Lost " + collection + " Kafka partitions: " + collection);
        collection.forEach(topicPartition -> {
            this.currentPartitions.remove("" + topicPartition.partition());
        });
        this.shardsAssignedAndPositioned.set(!this.currentPartitions.isEmpty());
    }

    public Set<String> getCurrentPartitions() {
        return this.currentPartitions;
    }

    public boolean shardsAssignedAndPositioned() {
        return this.shardsAssignedAndPositioned.get();
    }
}
