package org.springframework.integration.kafka.channel;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.integration.dispatcher.MessageDispatcher;
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
import org.springframework.integration.dispatcher.UnicastingDispatcher;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.integration.support.management.ManageableSmartLifecycle;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-kafka-6.1.5.jar:org/springframework/integration/kafka/channel/SubscribableKafkaChannel.class */
public class SubscribableKafkaChannel extends AbstractKafkaChannel implements SubscribableChannel, ManageableSmartLifecycle {
    private static final int DEFAULT_PHASE = 1073741823;
    private final KafkaListenerContainerFactory<?> factory;
    private final IntegrationRecordMessageListener recordListener;
    private MessageDispatcher dispatcher;
    private MessageListenerContainer container;
    private boolean autoStartup;
    private int phase;
    private volatile boolean running;

    /* loaded from: input_file:BOOT-INF/lib/spring-integration-kafka-6.1.5.jar:org/springframework/integration/kafka/channel/SubscribableKafkaChannel$IntegrationRecordMessageListener.class */
    private class IntegrationRecordMessageListener extends RecordMessagingMessageListenerAdapter<Object, Object> {
        IntegrationRecordMessageListener() {
            super(null, null);
        }

        @Override // org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter, org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener
        public void onMessage(ConsumerRecord<Object, Object> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            SubscribableKafkaChannel.this.dispatcher.dispatch(toMessagingMessage(consumerRecord, acknowledgment, consumer));
        }

        @Override // org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter, org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener, org.springframework.kafka.listener.GenericMessageListener
        public /* bridge */ /* synthetic */ void onMessage(Object obj, Acknowledgment acknowledgment, Consumer consumer) {
            onMessage((ConsumerRecord<Object, Object>) obj, acknowledgment, (Consumer<?, ?>) consumer);
        }
    }

    public SubscribableKafkaChannel(KafkaOperations<?, ?> kafkaOperations, KafkaListenerContainerFactory<?> kafkaListenerContainerFactory, String str) {
        super(kafkaOperations, str);
        this.recordListener = new IntegrationRecordMessageListener();
        this.autoStartup = true;
        this.phase = 1073741823;
        Assert.notNull(kafkaListenerContainerFactory, "'factory' cannot be null");
        this.factory = kafkaListenerContainerFactory;
        if (JacksonPresent.isJackson2Present()) {
            MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
            DefaultKafkaHeaderMapper defaultKafkaHeaderMapper = new DefaultKafkaHeaderMapper();
            defaultKafkaHeaderMapper.addTrustedPackages((String[]) JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
            messagingMessageConverter.setHeaderMapper(defaultKafkaHeaderMapper);
            this.recordListener.setMessageConverter(messagingMessageConverter);
        }
    }

    public void setMessageConverter(RecordMessageConverter recordMessageConverter) {
        this.recordListener.setMessageConverter(recordMessageConverter);
    }

    @Override // org.springframework.context.SmartLifecycle, org.springframework.context.Phased
    public int getPhase() {
        return this.phase;
    }

    public void setPhase(int i) {
        this.phase = i;
    }

    @Override // org.springframework.context.Lifecycle
    public boolean isRunning() {
        return this.running;
    }

    public void setAutoStartup(boolean z) {
        this.autoStartup = z;
    }

    @Override // org.springframework.context.SmartLifecycle
    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Type inference failed for: r1v4, types: [org.springframework.kafka.listener.MessageListenerContainer] */
    @Override // org.springframework.integration.channel.AbstractMessageChannel, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        this.dispatcher = createDispatcher();
        this.container = this.factory.createContainer(this.topic);
        String groupId = getGroupId();
        ContainerProperties containerProperties = this.container.getContainerProperties();
        containerProperties.setGroupId(groupId != null ? groupId : getBeanName());
        containerProperties.setMessageListener(this.recordListener);
    }

    protected MessageDispatcher createDispatcher() {
        UnicastingDispatcher unicastingDispatcher = new UnicastingDispatcher();
        unicastingDispatcher.setLoadBalancingStrategy(new RoundRobinLoadBalancingStrategy());
        return unicastingDispatcher;
    }

    @Override // org.springframework.context.Lifecycle
    public void start() {
        this.container.start();
        this.running = true;
    }

    @Override // org.springframework.context.Lifecycle
    public void stop() {
        this.container.stop();
        this.running = false;
    }

    @Override // org.springframework.context.SmartLifecycle
    public void stop(Runnable runnable) {
        this.container.stop(() -> {
            runnable.run();
            this.running = false;
        });
    }

    @Override // org.springframework.messaging.SubscribableChannel
    public boolean subscribe(MessageHandler messageHandler) {
        return this.dispatcher.addHandler(messageHandler);
    }

    @Override // org.springframework.messaging.SubscribableChannel
    public boolean unsubscribe(MessageHandler messageHandler) {
        return this.dispatcher.removeHandler(messageHandler);
    }
}
