package org.springframework.kafka.listener;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.event.ContainerStoppedEvent;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-kafka-3.0.12.jar:org/springframework/kafka/listener/AbstractMessageListenerContainer.class */
public abstract class AbstractMessageListenerContainer<K, V> implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware, ApplicationContextAware {
    private static final String VERSION_2_8 = "2.8";
    public static final int DEFAULT_PHASE = 2147483547;
    private static final int DEFAULT_TOPIC_CHECK_TIMEOUT = 30;

    @NonNull
    protected final ConsumerFactory<K, V> consumerFactory;
    private final ContainerProperties containerProperties;
    private ApplicationEventPublisher applicationEventPublisher;
    private GenericErrorHandler<?> errorHandler;
    private CommonErrorHandler commonErrorHandler;
    private RecordInterceptor<K, V> recordInterceptor;
    private BatchInterceptor<K, V> batchInterceptor;
    private byte[] listenerInfo;
    private ApplicationContext applicationContext;
    private volatile boolean paused;

    @Nullable
    private String mainListenerId;
    private boolean changeConsumerThreadName;

    @Nullable
    private KafkaAdmin kafkaAdmin;
    protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
    protected final Object lifecycleMonitor = new Object();
    private final Set<TopicPartition> pauseRequestedPartitions = ConcurrentHashMap.newKeySet();

    @NonNull
    private String beanName = "noBeanNameSet";
    private boolean autoStartup = true;
    private int phase = DEFAULT_PHASE;
    private AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor = new DefaultAfterRollbackProcessor();
    private int topicCheckTimeout = 30;
    private boolean interceptBeforeTx = true;
    private volatile boolean running = false;
    private volatile boolean stoppedNormally = true;

    @NonNull
    private Function<MessageListenerContainer, String> threadNameSupplier = messageListenerContainer -> {
        return messageListenerContainer.getListenerId();
    };

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public AbstractMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) {
        Assert.notNull(containerProperties, "'containerProperties' cannot be null");
        Assert.notNull(consumerFactory, "'consumerFactory' cannot be null");
        this.consumerFactory = consumerFactory;
        String[] topics = containerProperties.getTopics();
        if (topics != null) {
            this.containerProperties = new ContainerProperties(topics);
        } else {
            Pattern topicPattern = containerProperties.getTopicPattern();
            if (topicPattern != null) {
                this.containerProperties = new ContainerProperties(topicPattern);
            } else {
                TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
                if (topicPartitions == null) {
                    throw new IllegalStateException("topics, topicPattern, or topicPartitions must be provided");
                }
                this.containerProperties = new ContainerProperties(topicPartitions);
            }
        }
        BeanUtils.copyProperties(containerProperties, this.containerProperties, "topics", "topicPartitions", "topicPattern", "ackCount", "ackTime", "subBatchPerPartition");
        if (containerProperties.getAckCount() > 0) {
            this.containerProperties.setAckCount(containerProperties.getAckCount());
        }
        if (containerProperties.getAckTime() > 0) {
            this.containerProperties.setAckTime(containerProperties.getAckTime());
        }
        Boolean subBatchPerPartition = containerProperties.getSubBatchPerPartition();
        if (subBatchPerPartition != null) {
            this.containerProperties.setSubBatchPerPartition(subBatchPerPartition);
        }
        if (this.containerProperties.getConsumerRebalanceListener() == null) {
            this.containerProperties.setConsumerRebalanceListener(createSimpleLoggingConsumerRebalanceListener());
        }
    }

    @Override // org.springframework.context.ApplicationContextAware
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public ApplicationContext getApplicationContext() {
        return this.applicationContext;
    }

    @Override // org.springframework.beans.factory.BeanNameAware
    public void setBeanName(String str) {
        this.beanName = str;
    }

    @Nullable
    public String getBeanName() {
        return this.beanName;
    }

    @Override // org.springframework.context.ApplicationEventPublisherAware
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    @Nullable
    public ApplicationEventPublisher getApplicationEventPublisher() {
        return this.applicationEventPublisher;
    }

    @Deprecated(since = VERSION_2_8, forRemoval = true)
    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    @Deprecated(since = VERSION_2_8, forRemoval = true)
    public void setGenericErrorHandler(@Nullable GenericErrorHandler<?> genericErrorHandler) {
        this.errorHandler = genericErrorHandler;
    }

    @Deprecated(since = VERSION_2_8, forRemoval = true)
    public void setBatchErrorHandler(BatchErrorHandler batchErrorHandler) {
        this.errorHandler = batchErrorHandler;
    }

    @Nullable
    @Deprecated(since = VERSION_2_8, forRemoval = true)
    public GenericErrorHandler<?> getGenericErrorHandler() {
        return this.errorHandler;
    }

    @Nullable
    public CommonErrorHandler getCommonErrorHandler() {
        return this.commonErrorHandler;
    }

    public void setCommonErrorHandler(@Nullable CommonErrorHandler commonErrorHandler) {
        this.commonErrorHandler = commonErrorHandler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isStoppedNormally() {
        return this.stoppedNormally;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setStoppedNormally(boolean z) {
        this.stoppedNormally = z;
    }

    @Override // org.springframework.context.SmartLifecycle
    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public void setAutoStartup(boolean z) {
        this.autoStartup = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setRunning(boolean z) {
        this.running = z;
    }

    @Override // org.springframework.context.Lifecycle
    public boolean isRunning() {
        return this.running;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isPaused() {
        return this.paused;
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public boolean isPartitionPauseRequested(TopicPartition topicPartition) {
        return this.pauseRequestedPartitions.contains(topicPartition);
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public void pausePartition(TopicPartition topicPartition) {
        this.pauseRequestedPartitions.add(topicPartition);
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public void resumePartition(TopicPartition topicPartition) {
        this.pauseRequestedPartitions.remove(topicPartition);
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public boolean isPauseRequested() {
        return this.paused;
    }

    public void setPhase(int i) {
        this.phase = i;
    }

    @Override // org.springframework.context.SmartLifecycle, org.springframework.context.Phased
    public int getPhase() {
        return this.phase;
    }

    public AfterRollbackProcessor<? super K, ? super V> getAfterRollbackProcessor() {
        return this.afterRollbackProcessor;
    }

    public void setAfterRollbackProcessor(AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor) {
        Assert.notNull(afterRollbackProcessor, "'afterRollbackProcessor' cannot be null");
        this.afterRollbackProcessor = afterRollbackProcessor;
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public ContainerProperties getContainerProperties() {
        return this.containerProperties;
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    @Nullable
    public String getGroupId() {
        return this.containerProperties.getGroupId() == null ? (String) this.consumerFactory.getConfigurationProperties().get("group.id") : this.containerProperties.getGroupId();
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public String getListenerId() {
        return this.beanName;
    }

    public void setMainListenerId(String str) {
        this.mainListenerId = str;
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    @Nullable
    public String getMainListenerId() {
        return this.mainListenerId;
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    @Nullable
    public byte[] getListenerInfo() {
        if (this.listenerInfo != null) {
            return Arrays.copyOf(this.listenerInfo, this.listenerInfo.length);
        }
        return null;
    }

    public void setListenerInfo(@Nullable byte[] bArr) {
        this.listenerInfo = bArr != null ? Arrays.copyOf(bArr, bArr.length) : null;
    }

    public void setTopicCheckTimeout(int i) {
        this.topicCheckTimeout = i;
    }

    public boolean isChangeConsumerThreadName() {
        return this.changeConsumerThreadName;
    }

    public void setChangeConsumerThreadName(boolean z) {
        this.changeConsumerThreadName = z;
    }

    public Function<MessageListenerContainer, String> getThreadNameSupplier() {
        return this.threadNameSupplier;
    }

    public void setThreadNameSupplier(Function<MessageListenerContainer, String> function) {
        Assert.notNull(function, "'threadNameSupplier' cannot be null");
        this.threadNameSupplier = function;
    }

    @Nullable
    public KafkaAdmin getKafkaAdmin() {
        return this.kafkaAdmin;
    }

    public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
        this.kafkaAdmin = kafkaAdmin;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RecordInterceptor<K, V> getRecordInterceptor() {
        return this.recordInterceptor;
    }

    public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
        this.recordInterceptor = recordInterceptor;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BatchInterceptor<K, V> getBatchInterceptor() {
        return this.batchInterceptor;
    }

    public void setBatchInterceptor(BatchInterceptor<K, V> batchInterceptor) {
        this.batchInterceptor = batchInterceptor;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isInterceptBeforeTx() {
        return this.interceptBeforeTx;
    }

    public void setInterceptBeforeTx(boolean z) {
        this.interceptBeforeTx = z;
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public void setupMessageListener(Object obj) {
        this.containerProperties.setMessageListener(obj);
    }

    @Override // org.springframework.context.Lifecycle
    public final void start() {
        checkGroupId();
        synchronized (this.lifecycleMonitor) {
            if (!isRunning()) {
                Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener, (Supplier<String>) () -> {
                    return "A " + GenericMessageListener.class.getName() + " implementation must be provided";
                });
                doStart();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkTopics() {
        if (this.containerProperties.isMissingTopicsFatal() && this.containerProperties.getTopicPattern() == null) {
            Map map = (Map) this.consumerFactory.getConfigurationProperties().entrySet().stream().filter(entry -> {
                return AdminClientConfig.configNames().contains(entry.getKey());
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
            propertiesFromConsumerPropertyOverrides().forEach((obj, obj2) -> {
                if (obj instanceof String) {
                    map.put((String) obj, obj2);
                }
            });
            List list = null;
            try {
                AdminClient create = AdminClient.create((Map<String, Object>) map);
                if (create != null) {
                    try {
                        String[] topics = this.containerProperties.getTopics();
                        if (topics == null) {
                            topics = (String[]) Arrays.stream(this.containerProperties.getTopicPartitions()).map((v0) -> {
                                return v0.getTopic();
                            }).toArray(i -> {
                                return new String[i];
                            });
                        }
                        list = (List) create.describeTopics(Arrays.asList(topics)).topicNameValues().entrySet().stream().filter(entry2 -> {
                            try {
                                ((KafkaFuture) entry2.getValue()).get(this.topicCheckTimeout, TimeUnit.SECONDS);
                                return false;
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                return true;
                            } catch (Exception e2) {
                                return true;
                            }
                        }).map((v0) -> {
                            return v0.getKey();
                        }).collect(Collectors.toList());
                    } finally {
                    }
                }
                if (create != null) {
                    create.close();
                }
            } catch (Exception e) {
                this.logger.error(e, "Failed to check topic existence");
            }
            if (list != null && list.size() > 0) {
                throw new IllegalStateException("Topic(s) " + list.toString() + " is/are not present and missingTopicsFatal is true");
            }
        }
    }

    public void checkGroupId() {
        if (this.containerProperties.getTopicPartitions() == null) {
            boolean z = true;
            if (this.consumerFactory != null) {
                Object obj = this.consumerFactory.getConfigurationProperties().get("group.id");
                z = (obj instanceof String) && StringUtils.hasText((String) obj);
            }
            Assert.state(z || StringUtils.hasText(this.containerProperties.getGroupId()), "No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.");
        }
    }

    protected abstract void doStart();

    @Override // org.springframework.context.Lifecycle
    public final void stop() {
        stop(true);
    }

    public final void stop(boolean z) {
        synchronized (this.lifecycleMonitor) {
            if (isRunning()) {
                if (z) {
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    Objects.requireNonNull(countDownLatch);
                    doStop(countDownLatch::countDown);
                    try {
                        countDownLatch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS);
                        publishContainerStoppedEvent();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                } else {
                    doStop(this::publishContainerStoppedEvent);
                }
            }
        }
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public void pause() {
        this.paused = true;
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public void resume() {
        this.paused = false;
    }

    @Override // org.springframework.context.SmartLifecycle
    public void stop(Runnable runnable) {
        synchronized (this.lifecycleMonitor) {
            if (isRunning()) {
                doStop(runnable);
            } else {
                runnable.run();
            }
        }
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public void stopAbnormally(Runnable runnable) {
        doStop(runnable, false);
        publishContainerStoppedEvent();
    }

    protected void doStop(Runnable runnable) {
        doStop(runnable, true);
        publishContainerStoppedEvent();
    }

    protected abstract void doStop(Runnable runnable, boolean z);

    protected final ConsumerRebalanceListener createSimpleLoggingConsumerRebalanceListener() {
        return new ConsumerRebalanceListener() { // from class: org.springframework.kafka.listener.AbstractMessageListenerContainer.1
            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                AbstractMessageListenerContainer.this.logger.info(() -> {
                    return AbstractMessageListenerContainer.this.getGroupId() + ": partitions revoked: " + collection;
                });
            }

            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                AbstractMessageListenerContainer.this.logger.info(() -> {
                    return AbstractMessageListenerContainer.this.getGroupId() + ": partitions assigned: " + collection;
                });
            }

            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsLost(Collection<TopicPartition> collection) {
                AbstractMessageListenerContainer.this.logger.info(() -> {
                    return AbstractMessageListenerContainer.this.getGroupId() + ": partitions lost: " + collection;
                });
            }
        };
    }

    protected void publishContainerStoppedEvent() {
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent((ApplicationEvent) new ContainerStoppedEvent(this, parentOrThis()));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected AbstractMessageListenerContainer<?, ?> parentOrThis() {
        return this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Properties propertiesFromConsumerPropertyOverrides() {
        Properties kafkaConsumerProperties = this.containerProperties.getKafkaConsumerProperties();
        Properties properties = new Properties();
        properties.putAll(kafkaConsumerProperties);
        kafkaConsumerProperties.stringPropertyNames().forEach(str -> {
            if (properties.contains(str)) {
                return;
            }
            properties.setProperty(str, kafkaConsumerProperties.getProperty(str));
        });
        return properties;
    }
}
