package org.springframework.cloud.stream.binder.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.boot.actuate.health.StatusAggregator;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-binder-kafka-3.2.4.jar:org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.class */
public class KafkaBinderHealthIndicator implements KafkaBinderHealth, DisposableBean {
    private static final int DEFAULT_TIMEOUT = 60;
    private final KafkaMessageChannelBinder binder;
    private final ConsumerFactory<?, ?> consumerFactory;
    private Consumer<?, ?> metadataConsumer;
    private boolean considerDownWhenAnyPartitionHasNoLeader;
    private final ExecutorService executor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory("kafka-binder-health-"));
    private int timeout = 60;

    public KafkaBinderHealthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder, ConsumerFactory<?, ?> consumerFactory) {
        this.binder = kafkaMessageChannelBinder;
        this.consumerFactory = consumerFactory;
    }

    public void setTimeout(int i) {
        this.timeout = i;
    }

    public void setConsiderDownWhenAnyPartitionHasNoLeader(boolean z) {
        this.considerDownWhenAnyPartitionHasNoLeader = z;
    }

    @Override // org.springframework.boot.actuate.health.HealthIndicator
    public Health health() {
        return merge(safelyBuildTopicsHealth(), buildListenerContainersHealth());
    }

    private Health merge(Health health, Health health2) {
        Status aggregateStatus = StatusAggregator.getDefault().getAggregateStatus(health.getStatus(), health2.getStatus());
        HashMap hashMap = new HashMap();
        hashMap.putAll(health.getDetails());
        hashMap.putAll(health2.getDetails());
        return Health.status(aggregateStatus).withDetails(hashMap).build();
    }

    private Health safelyBuildTopicsHealth() {
        try {
            return (Health) this.executor.submit(this::buildTopicsHealth).get(this.timeout, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return Health.down().withDetail("Interrupted while waiting for partition information in", this.timeout + " seconds").build();
        } catch (ExecutionException e2) {
            return Health.down(e2).build();
        } catch (TimeoutException e3) {
            return Health.down().withDetail("Failed to retrieve partition information in", this.timeout + " seconds").build();
        }
    }

    private void initMetadataConsumer() {
        if (this.metadataConsumer == null) {
            this.metadataConsumer = this.consumerFactory.createConsumer();
        }
    }

    private Health buildTopicsHealth() {
        try {
            initMetadataConsumer();
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = this.binder.getTopicsInUse();
            if (topicsInUse.isEmpty()) {
                try {
                    this.metadataConsumer.listTopics(Duration.ofSeconds(this.timeout));
                    return Health.unknown().withDetail("No bindings found", "Kafka binder may not be bound to destinations on the broker").build();
                } catch (Exception e) {
                    return Health.down().withDetail("No topic information available", "Kafka broker is not reachable").build();
                }
            }
            for (String str : topicsInUse.keySet()) {
                KafkaMessageChannelBinder.TopicInformation topicInformation = topicsInUse.get(str);
                if (!topicInformation.isTopicPattern()) {
                    for (PartitionInfo partitionInfo : this.metadataConsumer.partitionsFor(str)) {
                        if ((topicInformation.getPartitionInfos().contains(partitionInfo) && partitionInfo.leader() == null) || (partitionInfo.leader() != null && partitionInfo.leader().id() == -1)) {
                            hashSet.add(partitionInfo.toString());
                        } else if ((this.considerDownWhenAnyPartitionHasNoLeader && partitionInfo.leader() == null) || (partitionInfo.leader() != null && partitionInfo.leader().id() == -1)) {
                            hashSet.add(partitionInfo.toString());
                        }
                    }
                    hashSet2.add(str);
                }
            }
            return hashSet.isEmpty() ? Health.up().withDetail("topicsInUse", hashSet2).build() : Health.down().withDetail("Following partitions in use have no leaders: ", hashSet.toString()).build();
        } catch (Exception e2) {
            return Health.down(e2).build();
        }
    }

    private Health buildListenerContainersHealth() {
        List<AbstractMessageListenerContainer<?, ?>> kafkaMessageListenerContainers = this.binder.getKafkaMessageListenerContainers();
        if (kafkaMessageListenerContainers.isEmpty()) {
            return Health.unknown().build();
        }
        Status status = Status.UP;
        ArrayList arrayList = new ArrayList();
        for (AbstractMessageListenerContainer<?, ?> abstractMessageListenerContainer : kafkaMessageListenerContainers) {
            HashMap hashMap = new HashMap();
            boolean isRunning = abstractMessageListenerContainer.isRunning();
            boolean isInExpectedState = abstractMessageListenerContainer.isInExpectedState();
            if (!isInExpectedState) {
                status = Status.DOWN;
            }
            hashMap.put("isRunning", Boolean.valueOf(isRunning));
            hashMap.put("isStoppedAbnormally", Boolean.valueOf((isRunning || isInExpectedState) ? false : true));
            hashMap.put("isPaused", Boolean.valueOf(abstractMessageListenerContainer.isContainerPaused()));
            hashMap.put("listenerId", abstractMessageListenerContainer.getListenerId());
            hashMap.put("groupId", abstractMessageListenerContainer.getGroupId());
            arrayList.add(hashMap);
        }
        return Health.status(status).withDetail("listenerContainers", arrayList).build();
    }

    @Override // org.springframework.beans.factory.DisposableBean
    public void destroy() {
        this.executor.shutdown();
    }
}
