package org.springframework.kafka.config;

import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.GenericErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.BatchToRecordAdapter;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer;
import org.springframework.kafka.requestreply.ReplyingKafkaOperations;
import org.springframework.kafka.support.JavaUtils;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-kafka-3.0.13.jar:org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.class */
public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K, V>, K, V> implements KafkaListenerContainerFactory<C>, ApplicationEventPublisherAware, InitializingBean, ApplicationContextAware {
    protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
    private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null);
    private GenericErrorHandler<?> errorHandler;
    private CommonErrorHandler commonErrorHandler;
    private ConsumerFactory<? super K, ? super V> consumerFactory;
    private Boolean autoStartup;
    private Integer phase;
    private RecordMessageConverter recordMessageConverter;
    private BatchMessageConverter batchMessageConverter;
    private RecordFilterStrategy<? super K, ? super V> recordFilterStrategy;
    private Boolean ackDiscarded;
    private Boolean batchListener;
    private ApplicationEventPublisher applicationEventPublisher;
    private KafkaTemplate<?, ?> replyTemplate;
    private AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor;
    private ReplyHeadersConfigurer replyHeadersConfigurer;
    private Boolean missingTopicsFatal;
    private RecordInterceptor<K, V> recordInterceptor;
    private BatchInterceptor<K, V> batchInterceptor;
    private BatchToRecordAdapter<K, V> batchToRecordAdapter;
    private ApplicationContext applicationContext;
    private ContainerCustomizer<K, V, C> containerCustomizer;
    private String correlationHeaderName;
    private Boolean changeConsumerThreadName;
    private Function<MessageListenerContainer, String> threadNameSupplier;

    @Override // org.springframework.context.ApplicationContextAware
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void setConsumerFactory(ConsumerFactory<? super K, ? super V> consumerFactory) {
        this.consumerFactory = consumerFactory;
    }

    public ConsumerFactory<? super K, ? super V> getConsumerFactory() {
        return this.consumerFactory;
    }

    public void setAutoStartup(Boolean bool) {
        this.autoStartup = bool;
    }

    public void setPhase(int i) {
        this.phase = Integer.valueOf(i);
    }

    @Deprecated
    public void setMessageConverter(MessageConverter messageConverter) {
        if (messageConverter instanceof RecordMessageConverter) {
            setRecordMessageConverter((RecordMessageConverter) messageConverter);
        } else {
            setBatchMessageConverter((BatchMessageConverter) messageConverter);
        }
    }

    public void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) {
        this.recordMessageConverter = recordMessageConverter;
    }

    public void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) {
        this.batchMessageConverter = batchMessageConverter;
    }

    public void setRecordFilterStrategy(RecordFilterStrategy<? super K, ? super V> recordFilterStrategy) {
        this.recordFilterStrategy = recordFilterStrategy;
    }

    public void setAckDiscarded(Boolean bool) {
        this.ackDiscarded = bool;
    }

    public Boolean isBatchListener() {
        return this.batchListener;
    }

    public void setBatchListener(Boolean bool) {
        this.batchListener = bool;
    }

    @Override // org.springframework.context.ApplicationEventPublisherAware
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    public void setReplyTemplate(KafkaTemplate<?, ?> kafkaTemplate) {
        if (kafkaTemplate instanceof ReplyingKafkaOperations) {
            this.logger.warn("The 'replyTemplate' should not be an implementation of 'ReplyingKafkaOperations'; such implementations are for client-side request/reply operations; here we are simply sending a reply to an incoming request so the reply container will never be used and will consume unnecessary resources.");
        }
        this.replyTemplate = kafkaTemplate;
    }

    @Deprecated(since = "2.8", forRemoval = true)
    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    @Deprecated(since = "2.8", forRemoval = true)
    public void setBatchErrorHandler(BatchErrorHandler batchErrorHandler) {
        this.errorHandler = batchErrorHandler;
    }

    public void setCommonErrorHandler(CommonErrorHandler commonErrorHandler) {
        this.commonErrorHandler = commonErrorHandler;
    }

    public void setAfterRollbackProcessor(AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor) {
        this.afterRollbackProcessor = afterRollbackProcessor;
    }

    public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer) {
        this.replyHeadersConfigurer = replyHeadersConfigurer;
    }

    public void setMissingTopicsFatal(boolean z) {
        this.missingTopicsFatal = Boolean.valueOf(z);
    }

    public ContainerProperties getContainerProperties() {
        return this.containerProperties;
    }

    public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
        this.recordInterceptor = recordInterceptor;
    }

    public void setBatchInterceptor(BatchInterceptor<K, V> batchInterceptor) {
        this.batchInterceptor = batchInterceptor;
    }

    public void setBatchToRecordAdapter(BatchToRecordAdapter<K, V> batchToRecordAdapter) {
        this.batchToRecordAdapter = batchToRecordAdapter;
    }

    public void setContainerCustomizer(ContainerCustomizer<K, V, C> containerCustomizer) {
        this.containerCustomizer = containerCustomizer;
    }

    public void setCorrelationHeaderName(String str) {
        this.correlationHeaderName = str;
    }

    public void setChangeConsumerThreadName(boolean z) {
        this.changeConsumerThreadName = Boolean.valueOf(z);
    }

    public void setThreadNameSupplier(Function<MessageListenerContainer, String> function) {
        Assert.notNull(function, "'threadNameSupplier' cannot be null");
        this.threadNameSupplier = function;
    }

    @Override // org.springframework.beans.factory.InitializingBean
    public void afterPropertiesSet() {
        if (this.commonErrorHandler != null || this.errorHandler == null) {
            return;
        }
        if (Boolean.TRUE.equals(this.batchListener)) {
            Assert.state(this.errorHandler instanceof BatchErrorHandler, (Supplier<String>) () -> {
                return "The error handler must be a BatchErrorHandler, not " + this.errorHandler.getClass().getName();
            });
        } else {
            Assert.state(this.errorHandler instanceof ErrorHandler, (Supplier<String>) () -> {
                return "The error handler must be an ErrorHandler, not " + this.errorHandler.getClass().getName();
            });
        }
    }

    @Override // org.springframework.kafka.config.KafkaListenerContainerFactory
    public C createListenerContainer(KafkaListenerEndpoint kafkaListenerEndpoint) {
        C createContainerInstance = createContainerInstance(kafkaListenerEndpoint);
        JavaUtils javaUtils = JavaUtils.INSTANCE;
        String id = kafkaListenerEndpoint.getId();
        Objects.requireNonNull(createContainerInstance);
        JavaUtils acceptIfNotNull = javaUtils.acceptIfNotNull(id, createContainerInstance::setBeanName);
        String mainListenerId = kafkaListenerEndpoint.getMainListenerId();
        Objects.requireNonNull(createContainerInstance);
        acceptIfNotNull.acceptIfNotNull(mainListenerId, createContainerInstance::setMainListenerId);
        if (kafkaListenerEndpoint instanceof AbstractKafkaListenerEndpoint) {
            configureEndpoint((AbstractKafkaListenerEndpoint) kafkaListenerEndpoint);
        }
        if (Boolean.TRUE.equals(kafkaListenerEndpoint.getBatchListener())) {
            kafkaListenerEndpoint.setupListenerContainer(createContainerInstance, this.batchMessageConverter);
        } else {
            kafkaListenerEndpoint.setupListenerContainer(createContainerInstance, this.recordMessageConverter);
        }
        initializeContainer(createContainerInstance, kafkaListenerEndpoint);
        customizeContainer(createContainerInstance);
        return createContainerInstance;
    }

    private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> abstractKafkaListenerEndpoint) {
        if (abstractKafkaListenerEndpoint.getRecordFilterStrategy() == null) {
            JavaUtils javaUtils = JavaUtils.INSTANCE;
            RecordFilterStrategy<? super K, ? super V> recordFilterStrategy = this.recordFilterStrategy;
            Objects.requireNonNull(abstractKafkaListenerEndpoint);
            javaUtils.acceptIfNotNull(recordFilterStrategy, abstractKafkaListenerEndpoint::setRecordFilterStrategy);
        }
        JavaUtils javaUtils2 = JavaUtils.INSTANCE;
        Boolean bool = this.ackDiscarded;
        Objects.requireNonNull(abstractKafkaListenerEndpoint);
        JavaUtils acceptIfNotNull = javaUtils2.acceptIfNotNull(bool, (v1) -> {
            r2.setAckDiscarded(v1);
        });
        KafkaTemplate<?, ?> kafkaTemplate = this.replyTemplate;
        Objects.requireNonNull(abstractKafkaListenerEndpoint);
        JavaUtils acceptIfNotNull2 = acceptIfNotNull.acceptIfNotNull(kafkaTemplate, abstractKafkaListenerEndpoint::setReplyTemplate);
        ReplyHeadersConfigurer replyHeadersConfigurer = this.replyHeadersConfigurer;
        Objects.requireNonNull(abstractKafkaListenerEndpoint);
        JavaUtils acceptIfNotNull3 = acceptIfNotNull2.acceptIfNotNull(replyHeadersConfigurer, abstractKafkaListenerEndpoint::setReplyHeadersConfigurer);
        BatchToRecordAdapter<K, V> batchToRecordAdapter = this.batchToRecordAdapter;
        Objects.requireNonNull(abstractKafkaListenerEndpoint);
        JavaUtils acceptIfNotNull4 = acceptIfNotNull3.acceptIfNotNull(batchToRecordAdapter, abstractKafkaListenerEndpoint::setBatchToRecordAdapter);
        String str = this.correlationHeaderName;
        Objects.requireNonNull(abstractKafkaListenerEndpoint);
        acceptIfNotNull4.acceptIfNotNull(str, abstractKafkaListenerEndpoint::setCorrelationHeaderName);
        if (abstractKafkaListenerEndpoint.getBatchListener() == null) {
            JavaUtils javaUtils3 = JavaUtils.INSTANCE;
            Boolean bool2 = this.batchListener;
            Objects.requireNonNull(abstractKafkaListenerEndpoint);
            javaUtils3.acceptIfNotNull(bool2, (v1) -> {
                r2.setBatchListener(v1);
            });
        }
    }

    protected abstract C createContainerInstance(KafkaListenerEndpoint kafkaListenerEndpoint);

    /* JADX INFO: Access modifiers changed from: protected */
    public void initializeContainer(C c, KafkaListenerEndpoint kafkaListenerEndpoint) {
        ContainerProperties containerProperties = c.getContainerProperties();
        BeanUtils.copyProperties(this.containerProperties, containerProperties, "topics", "topicPartitions", "topicPattern", "messageListener", "ackCount", "ackTime", "subBatchPerPartition", "kafkaConsumerProperties");
        JavaUtils javaUtils = JavaUtils.INSTANCE;
        AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor = this.afterRollbackProcessor;
        Objects.requireNonNull(c);
        JavaUtils acceptIfNotNull = javaUtils.acceptIfNotNull(afterRollbackProcessor, c::setAfterRollbackProcessor);
        boolean z = this.containerProperties.getAckCount() > 0;
        Integer valueOf = Integer.valueOf(this.containerProperties.getAckCount());
        Objects.requireNonNull(containerProperties);
        JavaUtils acceptIfCondition = acceptIfNotNull.acceptIfCondition(z, valueOf, (v1) -> {
            r3.setAckCount(v1);
        });
        boolean z2 = this.containerProperties.getAckTime() > 0;
        Long valueOf2 = Long.valueOf(this.containerProperties.getAckTime());
        Objects.requireNonNull(containerProperties);
        JavaUtils acceptIfCondition2 = acceptIfCondition.acceptIfCondition(z2, valueOf2, (v1) -> {
            r3.setAckTime(v1);
        });
        Boolean subBatchPerPartition = this.containerProperties.getSubBatchPerPartition();
        Objects.requireNonNull(containerProperties);
        JavaUtils acceptIfNotNull2 = acceptIfCondition2.acceptIfNotNull(subBatchPerPartition, containerProperties::setSubBatchPerPartition);
        GenericErrorHandler<?> genericErrorHandler = this.errorHandler;
        Objects.requireNonNull(c);
        JavaUtils acceptIfNotNull3 = acceptIfNotNull2.acceptIfNotNull(genericErrorHandler, c::setGenericErrorHandler);
        CommonErrorHandler commonErrorHandler = this.commonErrorHandler;
        Objects.requireNonNull(c);
        JavaUtils acceptIfNotNull4 = acceptIfNotNull3.acceptIfNotNull(commonErrorHandler, c::setCommonErrorHandler);
        Boolean bool = this.missingTopicsFatal;
        ContainerProperties containerProperties2 = c.getContainerProperties();
        Objects.requireNonNull(containerProperties2);
        JavaUtils acceptIfNotNull5 = acceptIfNotNull4.acceptIfNotNull(bool, (v1) -> {
            r2.setMissingTopicsFatal(v1);
        });
        Boolean bool2 = this.changeConsumerThreadName;
        Objects.requireNonNull(c);
        JavaUtils acceptIfNotNull6 = acceptIfNotNull5.acceptIfNotNull(bool2, (v1) -> {
            r2.setChangeConsumerThreadName(v1);
        });
        Function<MessageListenerContainer, String> function = this.threadNameSupplier;
        Objects.requireNonNull(c);
        acceptIfNotNull6.acceptIfNotNull(function, c::setThreadNameSupplier);
        Boolean autoStartup = kafkaListenerEndpoint.getAutoStartup();
        if (autoStartup != null) {
            c.setAutoStartup(autoStartup.booleanValue());
        } else if (this.autoStartup != null) {
            c.setAutoStartup(this.autoStartup.booleanValue());
        }
        c.setRecordInterceptor(this.recordInterceptor);
        c.setBatchInterceptor(this.batchInterceptor);
        JavaUtils javaUtils2 = JavaUtils.INSTANCE;
        Integer num = this.phase;
        Objects.requireNonNull(c);
        JavaUtils acceptIfNotNull7 = javaUtils2.acceptIfNotNull(num, (v1) -> {
            r2.setPhase(v1);
        });
        ApplicationContext applicationContext = this.applicationContext;
        Objects.requireNonNull(c);
        JavaUtils acceptIfNotNull8 = acceptIfNotNull7.acceptIfNotNull(applicationContext, c::setApplicationContext);
        ApplicationEventPublisher applicationEventPublisher = this.applicationEventPublisher;
        Objects.requireNonNull(c);
        JavaUtils acceptIfNotNull9 = acceptIfNotNull8.acceptIfNotNull(applicationEventPublisher, c::setApplicationEventPublisher);
        String groupId = kafkaListenerEndpoint.getGroupId();
        ContainerProperties containerProperties3 = c.getContainerProperties();
        Objects.requireNonNull(containerProperties3);
        JavaUtils acceptIfHasText = acceptIfNotNull9.acceptIfHasText(groupId, containerProperties3::setGroupId);
        String clientIdPrefix = kafkaListenerEndpoint.getClientIdPrefix();
        ContainerProperties containerProperties4 = c.getContainerProperties();
        Objects.requireNonNull(containerProperties4);
        JavaUtils acceptIfHasText2 = acceptIfHasText.acceptIfHasText(clientIdPrefix, containerProperties4::setClientId);
        Properties consumerProperties = kafkaListenerEndpoint.getConsumerProperties();
        ContainerProperties containerProperties5 = c.getContainerProperties();
        Objects.requireNonNull(containerProperties5);
        JavaUtils acceptIfNotNull10 = acceptIfHasText2.acceptIfNotNull(consumerProperties, containerProperties5::setKafkaConsumerProperties);
        byte[] listenerInfo = kafkaListenerEndpoint.getListenerInfo();
        Objects.requireNonNull(c);
        acceptIfNotNull10.acceptIfNotNull(listenerInfo, c::setListenerInfo);
    }

    private void customizeContainer(C c) {
        if (this.containerCustomizer != null) {
            this.containerCustomizer.configure(c);
        }
    }

    @Override // org.springframework.kafka.config.KafkaListenerContainerFactory
    public C createContainer(final TopicPartitionOffset... topicPartitionOffsetArr) {
        KafkaListenerEndpointAdapter kafkaListenerEndpointAdapter = new KafkaListenerEndpointAdapter() { // from class: org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.1
            @Override // org.springframework.kafka.config.KafkaListenerEndpointAdapter, org.springframework.kafka.config.KafkaListenerEndpoint
            public TopicPartitionOffset[] getTopicPartitionsToAssign() {
                return (TopicPartitionOffset[]) Arrays.copyOf(topicPartitionOffsetArr, topicPartitionOffsetArr.length);
            }
        };
        C createContainerInstance = createContainerInstance(kafkaListenerEndpointAdapter);
        initializeContainer(createContainerInstance, kafkaListenerEndpointAdapter);
        customizeContainer(createContainerInstance);
        return createContainerInstance;
    }

    @Override // org.springframework.kafka.config.KafkaListenerContainerFactory
    public C createContainer(final String... strArr) {
        KafkaListenerEndpointAdapter kafkaListenerEndpointAdapter = new KafkaListenerEndpointAdapter() { // from class: org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.2
            @Override // org.springframework.kafka.config.KafkaListenerEndpointAdapter, org.springframework.kafka.config.KafkaListenerEndpoint
            public Collection<String> getTopics() {
                return Arrays.asList(strArr);
            }
        };
        C createContainerInstance = createContainerInstance(kafkaListenerEndpointAdapter);
        initializeContainer(createContainerInstance, kafkaListenerEndpointAdapter);
        customizeContainer(createContainerInstance);
        return createContainerInstance;
    }

    @Override // org.springframework.kafka.config.KafkaListenerContainerFactory
    public C createContainer(final Pattern pattern) {
        KafkaListenerEndpointAdapter kafkaListenerEndpointAdapter = new KafkaListenerEndpointAdapter() { // from class: org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.3
            @Override // org.springframework.kafka.config.KafkaListenerEndpointAdapter, org.springframework.kafka.config.KafkaListenerEndpoint
            public Pattern getTopicPattern() {
                return pattern;
            }
        };
        C createContainerInstance = createContainerInstance(kafkaListenerEndpointAdapter);
        initializeContainer(createContainerInstance, kafkaListenerEndpointAdapter);
        customizeContainer(createContainerInstance);
        return createContainerInstance;
    }
}
