package org.joyqueue.client.internal.consumer.support;

import org.joyqueue.client.internal.cluster.ClusterClientManager;
import org.joyqueue.client.internal.cluster.ClusterManager;
import org.joyqueue.client.internal.consumer.BaseMessageListener;
import org.joyqueue.client.internal.consumer.MessagePoller;
import org.joyqueue.client.internal.consumer.MessagePollerFactory;
import org.joyqueue.client.internal.consumer.config.ConsumerConfig;
import org.joyqueue.client.internal.consumer.coordinator.ConsumerCoordinator;
import org.joyqueue.client.internal.consumer.exception.ConsumerException;
import org.joyqueue.client.internal.consumer.interceptor.ConsumerInterceptor;
import org.joyqueue.client.internal.consumer.interceptor.ConsumerInterceptorManager;
import org.joyqueue.client.internal.consumer.transport.ConsumerClientManager;
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.client.internal.nameserver.NameServerConfig;
import org.joyqueue.client.internal.nameserver.helper.NameServerHelper;
import org.joyqueue.domain.TopicName;
import org.joyqueue.domain.TopicType;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/client/internal/consumer/support/TopicMessageConsumer.class */
public class TopicMessageConsumer extends Service {
    protected static final Logger logger = LoggerFactory.getLogger(ConsumerCoordinator.class);
    private String topic;
    private ConsumerConfig config;
    private NameServerConfig nameServerConfig;
    private ClusterManager clusterManager;
    private ClusterClientManager clusterClientManager;
    private ConsumerClientManager consumerClientManager;
    private ConsumerInterceptorManager consumerInterceptorManager;
    private MessagePoller messagePoller;
    private TopicMessageConsumerDispatcher messageConsumerDispatcher;
    private TopicMessageConsumerScheduler messageConsumerScheduler;
    private MessageListenerManager messageListenerManager;
    private String appFullName;

    public TopicMessageConsumer(String str, ConsumerConfig consumerConfig, NameServerConfig nameServerConfig, ClusterManager clusterManager, ClusterClientManager clusterClientManager, ConsumerClientManager consumerClientManager) {
        this(str, consumerConfig, nameServerConfig, clusterManager, clusterClientManager, consumerClientManager, new ConsumerInterceptorManager());
    }

    public TopicMessageConsumer(String str, ConsumerConfig consumerConfig, NameServerConfig nameServerConfig, ClusterManager clusterManager, ClusterClientManager clusterClientManager, ConsumerClientManager consumerClientManager, ConsumerInterceptorManager consumerInterceptorManager) {
        this.messageListenerManager = new MessageListenerManager();
        this.topic = str;
        this.config = consumerConfig;
        this.nameServerConfig = nameServerConfig;
        this.clusterManager = clusterManager;
        this.clusterClientManager = clusterClientManager;
        this.consumerClientManager = consumerClientManager;
        this.consumerInterceptorManager = consumerInterceptorManager;
    }

    protected void validate() throws Exception {
        this.messagePoller = createMessageConsumer(this.topic);
        this.messageConsumerDispatcher = new TopicMessageConsumerDispatcher(this.topic, this.config, this.nameServerConfig, this.messagePoller, this.messageListenerManager, this.consumerInterceptorManager);
        this.messageConsumerScheduler = new TopicMessageConsumerScheduler(this.topic, this.config, this.messagePoller, this.messageConsumerDispatcher);
    }

    protected void doStart() throws Exception {
        this.messagePoller.start();
        if (this.messageListenerManager.isEmpty()) {
            return;
        }
        this.messageConsumerDispatcher.start();
        this.messageConsumerScheduler.start();
    }

    protected void doStop() {
        if (!this.messageListenerManager.isEmpty()) {
            if (this.messageConsumerScheduler != null) {
                this.messageConsumerScheduler.stop();
            }
            if (this.messageConsumerDispatcher != null) {
                this.messageConsumerDispatcher.stop();
            }
        }
        if (this.messagePoller != null) {
            this.messagePoller.stop();
        }
    }

    public void suspend() {
        this.messageConsumerScheduler.suspend();
    }

    public boolean isSuspend() {
        return this.messageConsumerScheduler.isSuspend();
    }

    public void resume() {
        this.messageConsumerScheduler.resume();
    }

    protected MessagePoller createMessageConsumer(String str) {
        TopicMetadata fetchTopicMetadata = this.clusterManager.fetchTopicMetadata(TopicName.parse(NameServerHelper.getTopicFullName(str, this.nameServerConfig)).getFullName(), this.config.getAppFullName());
        if (fetchTopicMetadata == null) {
            throw new ConsumerException(String.format("topic %s is not exist", str), JoyQueueCode.FW_TOPIC_NOT_EXIST.getCode());
        }
        if (fetchTopicMetadata.getConsumerPolicy() == null) {
            throw new ConsumerException(String.format("topic %s consumer %s is not exist", str, this.config.getAppFullName()), JoyQueueCode.FW_TOPIC_NOT_EXIST.getCode());
        }
        return fetchTopicMetadata.getType().equals(TopicType.BROADCAST) ? MessagePollerFactory.createBroadcastPoller(this.config, this.nameServerConfig, this.clusterManager, this.consumerClientManager) : MessagePollerFactory.create(this.config, this.nameServerConfig, this.clusterManager, this.clusterClientManager, this.consumerClientManager);
    }

    public synchronized void addInterceptor(ConsumerInterceptor consumerInterceptor) {
        this.consumerInterceptorManager.addInterceptor(consumerInterceptor);
    }

    public synchronized void removeInterceptor(ConsumerInterceptor consumerInterceptor) {
        this.consumerInterceptorManager.removeInterceptor(consumerInterceptor);
    }

    public synchronized void addListener(BaseMessageListener baseMessageListener) {
        boolean isEmpty = this.messageListenerManager.isEmpty();
        this.messageListenerManager.addListener(baseMessageListener);
        if (isStarted() && isEmpty) {
            try {
                this.messageConsumerDispatcher.start();
                this.messageConsumerScheduler.start();
            } catch (Exception e) {
                throw new ConsumerException(e);
            }
        }
    }

    public synchronized void removeListener(BaseMessageListener baseMessageListener) {
        this.messageListenerManager.removeListener(baseMessageListener);
    }

    public MessagePoller getMessagePoller() {
        return this.messagePoller;
    }
}
