package org.springframework.integration.kafka.channel;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.ExecutorChannelInterceptorAware;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.support.management.IntegrationManagement;
import org.springframework.integration.support.management.metrics.CounterFacade;
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ExecutorChannelInterceptor;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-kafka-6.1.4.jar:org/springframework/integration/kafka/channel/PollableKafkaChannel.class */
public class PollableKafkaChannel extends AbstractKafkaChannel implements PollableChannel, ExecutorChannelInterceptorAware {
    private final KafkaMessageSource<?, ?> source;
    private CounterFacade receiveCounter;
    private volatile int executorInterceptorsSize;

    public PollableKafkaChannel(KafkaOperations<?, ?> kafkaOperations, KafkaMessageSource<?, ?> kafkaMessageSource) {
        super(kafkaOperations, topic(kafkaMessageSource));
        this.source = kafkaMessageSource;
        if (kafkaMessageSource.getConsumerProperties().getGroupId() == null) {
            String groupId = getGroupId();
            kafkaMessageSource.getConsumerProperties().setGroupId(groupId != null ? groupId : getBeanName());
        }
    }

    @Override // org.springframework.messaging.PollableChannel
    @Nullable
    public Message<?> receive() {
        return doReceive();
    }

    @Override // org.springframework.messaging.PollableChannel
    @Nullable
    public Message<?> receive(long j) {
        return doReceive();
    }

    @Nullable
    protected Message<?> doReceive() {
        AbstractMessageChannel.ChannelInterceptorList iChannelInterceptorList = getIChannelInterceptorList();
        ArrayDeque arrayDeque = null;
        boolean z = false;
        try {
            if (isLoggingEnabled()) {
                this.logger.trace(() -> {
                    return "preReceive on channel '" + this + "'";
                });
            }
            if (iChannelInterceptorList.getInterceptors().size() > 0) {
                arrayDeque = new ArrayDeque();
                if (!iChannelInterceptorList.preReceive(this, arrayDeque)) {
                    return null;
                }
            }
            Message<?> receive = this.source.receive();
            if (receive != null) {
                incrementReceiveCounter();
                z = true;
                receive = iChannelInterceptorList.postReceive(receive, this);
            }
            iChannelInterceptorList.afterReceiveCompletion(receive, this, null, arrayDeque);
            return receive;
        } catch (RuntimeException e) {
            if (!z) {
                incrementReceiveErrorCounter(e);
            }
            iChannelInterceptorList.afterReceiveCompletion(null, this, e, arrayDeque);
            throw e;
        }
    }

    private void incrementReceiveCounter() {
        MetricsCaptor metricsCaptor = getMetricsCaptor();
        if (metricsCaptor != null) {
            if (this.receiveCounter == null) {
                this.receiveCounter = buildReceiveCounter(metricsCaptor, null);
            }
            this.receiveCounter.increment();
        }
    }

    private void incrementReceiveErrorCounter(Exception exc) {
        MetricsCaptor metricsCaptor = getMetricsCaptor();
        if (metricsCaptor != null) {
            buildReceiveCounter(metricsCaptor, exc).increment();
        }
    }

    private CounterFacade buildReceiveCounter(MetricsCaptor metricsCaptor, @Nullable Exception exc) {
        CounterFacade build = metricsCaptor.counterBuilder(IntegrationManagement.RECEIVE_COUNTER_NAME).tag("name", getComponentName() == null ? "unknown" : getComponentName()).tag("type", "channel").tag("result", exc == null ? "success" : "failure").tag("exception", exc == null ? "none" : exc.getClass().getSimpleName()).description("Messages received").build();
        this.meters.add(build);
        return build;
    }

    @Override // org.springframework.integration.channel.AbstractMessageChannel, org.springframework.messaging.support.InterceptableChannel
    public void setInterceptors(List<ChannelInterceptor> list) {
        super.setInterceptors(list);
        Iterator<ChannelInterceptor> it = list.iterator();
        while (it.hasNext()) {
            if (it.next() instanceof ExecutorChannelInterceptor) {
                this.executorInterceptorsSize++;
            }
        }
    }

    @Override // org.springframework.integration.channel.AbstractMessageChannel, org.springframework.messaging.support.InterceptableChannel
    public void addInterceptor(ChannelInterceptor channelInterceptor) {
        super.addInterceptor(channelInterceptor);
        if (channelInterceptor instanceof ExecutorChannelInterceptor) {
            this.executorInterceptorsSize++;
        }
    }

    @Override // org.springframework.integration.channel.AbstractMessageChannel, org.springframework.messaging.support.InterceptableChannel
    public void addInterceptor(int i, ChannelInterceptor channelInterceptor) {
        super.addInterceptor(i, channelInterceptor);
        if (channelInterceptor instanceof ExecutorChannelInterceptor) {
            this.executorInterceptorsSize++;
        }
    }

    @Override // org.springframework.integration.channel.AbstractMessageChannel, org.springframework.messaging.support.InterceptableChannel
    public boolean removeInterceptor(ChannelInterceptor channelInterceptor) {
        boolean removeInterceptor = super.removeInterceptor(channelInterceptor);
        if (removeInterceptor && (channelInterceptor instanceof ExecutorChannelInterceptor)) {
            this.executorInterceptorsSize--;
        }
        return removeInterceptor;
    }

    @Override // org.springframework.integration.channel.AbstractMessageChannel, org.springframework.messaging.support.InterceptableChannel
    @Nullable
    public ChannelInterceptor removeInterceptor(int i) {
        ChannelInterceptor removeInterceptor = super.removeInterceptor(i);
        if (removeInterceptor instanceof ExecutorChannelInterceptor) {
            this.executorInterceptorsSize--;
        }
        return removeInterceptor;
    }

    @Override // org.springframework.integration.channel.ExecutorChannelInterceptorAware
    public boolean hasExecutorInterceptors() {
        return this.executorInterceptorsSize > 0;
    }

    private static String topic(KafkaMessageSource<?, ?> kafkaMessageSource) {
        Assert.notNull(kafkaMessageSource, "'source' cannot be null");
        String[] topics = kafkaMessageSource.getConsumerProperties().getTopics();
        Assert.isTrue(topics != null && topics.length == 1, "Only one topic is allowed");
        return topics[0];
    }
}
