package org.springframework.cloud.stream.binder.kafka;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.noop.NoopGauge;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.ToDoubleFunction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.ApplicationListener;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-binder-kafka-3.2.9.jar:org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.class */
public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<BindingCreatedEvent>, AutoCloseable {
    private static final int DEFAULT_TIMEOUT = 5;
    private static final Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
    public static final String OFFSET_LAG_METRIC_NAME = "spring.cloud.stream.binder.kafka.offset";
    private final KafkaMessageChannelBinder binder;
    private final KafkaBinderConfigurationProperties binderConfigurationProperties;
    private ConsumerFactory<?, ?> defaultConsumerFactory;
    private final MeterRegistry meterRegistry;
    private Map<String, Consumer<?, ?>> metadataConsumers;
    private int timeout;
    private final Map<String, Long> lastUnconsumedMessagesValues;
    ScheduledExecutorService scheduler;

    public KafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder, KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, ConsumerFactory<?, ?> consumerFactory, @Nullable MeterRegistry meterRegistry) {
        this.timeout = 5;
        this.lastUnconsumedMessagesValues = new ConcurrentHashMap();
        this.binder = kafkaMessageChannelBinder;
        this.binderConfigurationProperties = kafkaBinderConfigurationProperties;
        this.defaultConsumerFactory = consumerFactory;
        this.meterRegistry = meterRegistry;
        this.metadataConsumers = new ConcurrentHashMap();
    }

    public KafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder, KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
        this(kafkaMessageChannelBinder, kafkaBinderConfigurationProperties, null, null);
    }

    public void setTimeout(int i) {
        this.timeout = i;
    }

    @Override // io.micrometer.core.instrument.binder.MeterBinder
    public void bindTo(MeterRegistry meterRegistry) {
        if (this.scheduler != null) {
            LOG.info("Try to shutdown the old scheduler with " + ((ScheduledThreadPoolExecutor) this.scheduler).getPoolSize() + " threads");
            this.scheduler.shutdown();
        }
        this.scheduler = Executors.newScheduledThreadPool(this.binder.getTopicsInUse().size());
        for (Map.Entry<String, KafkaMessageChannelBinder.TopicInformation> entry : this.binder.getTopicsInUse().entrySet()) {
            if (entry.getValue().isConsumerTopic()) {
                String key = entry.getKey();
                String consumerGroup = entry.getValue().getConsumerGroup();
                if (!(Gauge.builder(OFFSET_LAG_METRIC_NAME, this, computeOffsetComputationFunction(key, consumerGroup)).tag("group", consumerGroup).tag("topic", key).description("Unconsumed messages for a particular group and topic").register(meterRegistry) instanceof NoopGauge)) {
                    this.lastUnconsumedMessagesValues.put(key + "-" + consumerGroup, 0L);
                    this.scheduler.scheduleWithFixedDelay(() -> {
                        computeUnconsumedMessages(key, consumerGroup);
                    }, 1L, this.binderConfigurationProperties.getMetrics().getOffsetLagMetricsInterval().getSeconds(), TimeUnit.SECONDS);
                }
            }
        }
    }

    private ToDoubleFunction<KafkaBinderMetrics> computeOffsetComputationFunction(String str, String str2) {
        return this.binderConfigurationProperties.getMetrics().isDefaultOffsetLagMetricsEnabled() ? kafkaBinderMetrics -> {
            return computeAndGetUnconsumedMessagesWithTimeout(str, str2);
        } : kafkaBinderMetrics2 -> {
            return this.lastUnconsumedMessagesValues.get(str + "-" + str2).longValue();
        };
    }

    private long computeAndGetUnconsumedMessagesWithTimeout(String str, String str2) {
        try {
            return ((Long) this.scheduler.submit(() -> {
                return Long.valueOf(computeUnconsumedMessages(str, str2));
            }).get(this.timeout, TimeUnit.SECONDS)).longValue();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return this.lastUnconsumedMessagesValues.get(str + "-" + str2).longValue();
        } catch (ExecutionException | TimeoutException e2) {
            return this.lastUnconsumedMessagesValues.get(str + "-" + str2).longValue();
        }
    }

    private long computeUnconsumedMessages(String str, String str2) {
        long j = 0;
        try {
            j = findTotalTopicGroupLag(str, str2, this.metadataConsumers);
            this.lastUnconsumedMessagesValues.put(str + "-" + str2, Long.valueOf(j));
        } catch (Exception e) {
            LOG.debug("Cannot generate metric for topic: " + str, e);
        }
        return j;
    }

    private long findTotalTopicGroupLag(String str, String str2, Map<String, Consumer<?, ?>> map) {
        long j = 0;
        Consumer<?, ?> computeIfAbsent = map.computeIfAbsent(str2, str3 -> {
            return createConsumerFactory().createConsumer(str3, "monitoring");
        });
        List<PartitionInfo> partitionsFor = computeIfAbsent.partitionsFor(str);
        LinkedList linkedList = new LinkedList();
        for (PartitionInfo partitionInfo : partitionsFor) {
            linkedList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }
        Map<TopicPartition, Long> endOffsets = computeIfAbsent.endOffsets(linkedList);
        Map<TopicPartition, OffsetAndMetadata> committed = computeIfAbsent.committed(endOffsets.keySet());
        Map<TopicPartition, Long> beginningOffsets = computeIfAbsent.beginningOffsets(endOffsets.keySet());
        for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
            OffsetAndMetadata offsetAndMetadata = committed.get(entry.getKey());
            Long l = beginningOffsets.get(entry.getKey());
            j += entry.getValue().longValue();
            if (offsetAndMetadata != null) {
                j -= offsetAndMetadata.offset();
            } else if (l != null) {
                j -= l.longValue();
            }
        }
        return j;
    }

    private synchronized ConsumerFactory<?, ?> createConsumerFactory() {
        if (this.defaultConsumerFactory == null) {
            HashMap hashMap = new HashMap();
            hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            Map<String, Object> mergedConsumerConfiguration = this.binderConfigurationProperties.mergedConsumerConfiguration();
            if (!ObjectUtils.isEmpty(mergedConsumerConfiguration)) {
                hashMap.putAll(mergedConsumerConfiguration);
            }
            if (!hashMap.containsKey("bootstrap.servers")) {
                hashMap.put("bootstrap.servers", this.binderConfigurationProperties.getKafkaConnectionString());
            }
            this.defaultConsumerFactory = new DefaultKafkaConsumerFactory(hashMap);
        }
        return this.defaultConsumerFactory;
    }

    @Override // org.springframework.context.ApplicationListener
    public void onApplicationEvent(BindingCreatedEvent bindingCreatedEvent) {
        if (this.meterRegistry != null) {
            bindTo(this.meterRegistry);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        Optional.ofNullable(this.scheduler).ifPresent((v0) -> {
            v0.shutdown();
        });
    }
}
