package org.joyqueue.client.internal.consumer.support;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.client.internal.cluster.ClusterClientManager;
import org.joyqueue.client.internal.cluster.ClusterManager;
import org.joyqueue.client.internal.consumer.BrokerLoadBalance;
import org.joyqueue.client.internal.consumer.ConsumerIndexManager;
import org.joyqueue.client.internal.consumer.MessageFetcher;
import org.joyqueue.client.internal.consumer.MessagePoller;
import org.joyqueue.client.internal.consumer.callback.PollerListener;
import org.joyqueue.client.internal.consumer.config.ConsumerConfig;
import org.joyqueue.client.internal.consumer.config.FetcherConfig;
import org.joyqueue.client.internal.consumer.coordinator.ConsumerCoordinator;
import org.joyqueue.client.internal.consumer.coordinator.domain.BrokerAssignments;
import org.joyqueue.client.internal.consumer.coordinator.domain.BrokerAssignmentsHolder;
import org.joyqueue.client.internal.consumer.domain.ConsumeMessage;
import org.joyqueue.client.internal.consumer.domain.ConsumeReply;
import org.joyqueue.client.internal.consumer.domain.FetchIndexData;
import org.joyqueue.client.internal.consumer.exception.ConsumerException;
import org.joyqueue.client.internal.consumer.transport.ConsumerClientManager;
import org.joyqueue.client.internal.metadata.domain.PartitionMetadata;
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.client.internal.nameserver.NameServerConfig;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/client/internal/consumer/support/DefaultMessagePoller.class */
public class DefaultMessagePoller extends Service implements MessagePoller {
    private static final int CUSTOM_BATCH_SIZE = -1;
    protected static final Logger logger = LoggerFactory.getLogger(DefaultMessagePoller.class);
    private ConsumerConfig config;
    private NameServerConfig nameServerConfig;
    private ClusterManager clusterManager;
    private ClusterClientManager clusterClientManager;
    private ConsumerClientManager consumerClientManager;
    private ConsumerCoordinator consumerCoordinator;
    private FetcherConfig fetcherConfig;
    private MessageFetcher messageFetcher;
    private ConsumerIndexManager consumerIndexManager;
    private MessagePollerInner messagePollerInner;
    private BrokerAssignmentsHolder brokerAssignmentCache;

    public DefaultMessagePoller(ConsumerConfig consumerConfig, NameServerConfig nameServerConfig, ClusterManager clusterManager, ClusterClientManager clusterClientManager, ConsumerClientManager consumerClientManager) {
        Preconditions.checkArgument(consumerConfig != null, "consumer can not be null");
        Preconditions.checkArgument(nameServerConfig != null, "nameServer can not be null");
        Preconditions.checkArgument(clusterManager != null, "clusterManager can not be null");
        Preconditions.checkArgument(clusterClientManager != null, "clusterClientManager can not be null");
        Preconditions.checkArgument(consumerClientManager != null, "consumerClientManager can not be null");
        Preconditions.checkArgument(StringUtils.isNotBlank(consumerConfig.getApp()), "consumer.app not blank");
        Preconditions.checkArgument(consumerConfig.getPollTimeout() > consumerConfig.getLongPollTimeout(), "consumer.pollTimeout must be greater than consumer.longPullTimeout");
        this.config = consumerConfig;
        this.nameServerConfig = nameServerConfig;
        this.clusterManager = clusterManager;
        this.clusterClientManager = clusterClientManager;
        this.consumerClientManager = consumerClientManager;
    }

    protected void validate() throws Exception {
        this.consumerCoordinator = new ConsumerCoordinator(this.clusterClientManager);
        this.fetcherConfig = new FetcherConfig();
        this.messageFetcher = new DefaultMessageFetcher(this.consumerClientManager, this.fetcherConfig);
        this.consumerIndexManager = new DefaultConsumerIndexManager(this.clusterManager, this.consumerClientManager);
        this.messagePollerInner = new MessagePollerInner(this.config, this.nameServerConfig, this.clusterManager, this.consumerClientManager, this.messageFetcher);
    }

    protected void doStart() throws Exception {
        this.messageFetcher.start();
        this.consumerCoordinator.start();
        this.messagePollerInner.start();
    }

    protected void doStop() {
        if (this.messagePollerInner != null) {
            this.messagePollerInner.stop();
        }
        if (this.consumerCoordinator != null) {
            this.consumerCoordinator.stop();
        }
        if (this.messageFetcher != null) {
            this.messageFetcher.stop();
        }
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public ConsumeMessage pollOnce(String str) {
        return pollOnce(str, this.config.getPollTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public ConsumeMessage pollOnce(String str, long j, TimeUnit timeUnit) {
        List<ConsumeMessage> doPoll = doPoll(str, 1, j, timeUnit, null);
        if (CollectionUtils.isEmpty(doPoll)) {
            return null;
        }
        return doPoll.get(0);
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public List<ConsumeMessage> poll(String str) {
        return poll(str, this.config.getPollTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public List<ConsumeMessage> poll(String str, long j, TimeUnit timeUnit) {
        return doPoll(str, -1, j, timeUnit, null);
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public CompletableFuture<List<ConsumeMessage>> pollAsync(String str) {
        return pollAsync(str, this.config.getPollTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public CompletableFuture<List<ConsumeMessage>> pollAsync(String str, long j, TimeUnit timeUnit) {
        CompletableFuture<List<ConsumeMessage>> completableFuture = new CompletableFuture<>();
        doPoll(str, -1, j, timeUnit, new CompletableFuturePollerListener(completableFuture));
        return completableFuture;
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public ConsumeMessage pollPartitionOnce(String str, short s) {
        return pollPartitionOnce(str, s, this.config.getPollTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public ConsumeMessage pollPartitionOnce(String str, short s, long j, TimeUnit timeUnit) {
        List<ConsumeMessage> doPollPartition = doPollPartition(str, s, 1, j, timeUnit, null);
        if (CollectionUtils.isEmpty(doPollPartition)) {
            return null;
        }
        return doPollPartition.get(0);
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public ConsumeMessage pollPartitionOnce(String str, short s, long j) {
        return pollPartitionOnce(str, s, j, this.config.getPollTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public ConsumeMessage pollPartitionOnce(String str, short s, long j, long j2, TimeUnit timeUnit) {
        List<ConsumeMessage> doPollPartition = doPollPartition(str, s, j, 1, this.config.getPollTimeout(), TimeUnit.MILLISECONDS, null);
        if (CollectionUtils.isEmpty(doPollPartition)) {
            return null;
        }
        return doPollPartition.get(0);
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public List<ConsumeMessage> pollPartition(String str, short s) {
        return pollPartition(str, s, this.config.getPollTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public List<ConsumeMessage> pollPartition(String str, short s, long j, TimeUnit timeUnit) {
        return doPollPartition(str, s, -1, j, timeUnit, null);
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public List<ConsumeMessage> pollPartition(String str, short s, long j) {
        return pollPartition(str, s, j, this.config.getPollTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public List<ConsumeMessage> pollPartition(String str, short s, long j, long j2, TimeUnit timeUnit) {
        return doPollPartition(str, s, j, -1, this.config.getPollTimeout(), TimeUnit.MILLISECONDS, null);
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public CompletableFuture<List<ConsumeMessage>> pollPartitionAsync(String str, short s) {
        return pollPartitionAsync(str, s, this.config.getPollTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public CompletableFuture<List<ConsumeMessage>> pollPartitionAsync(String str, short s, long j, TimeUnit timeUnit) {
        CompletableFuture<List<ConsumeMessage>> completableFuture = new CompletableFuture<>();
        doPollPartition(str, s, -1, j, timeUnit, new CompletableFuturePollerListener(completableFuture));
        return completableFuture;
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public CompletableFuture<List<ConsumeMessage>> pollPartitionAsync(String str, short s, long j) {
        return pollPartitionAsync(str, s, j, this.config.getPollTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public CompletableFuture<List<ConsumeMessage>> pollPartitionAsync(String str, short s, long j, long j2, TimeUnit timeUnit) {
        CompletableFuture<List<ConsumeMessage>> completableFuture = new CompletableFuture<>();
        doPollPartition(str, s, j, -1, j2, timeUnit, new CompletableFuturePollerListener(completableFuture));
        return completableFuture;
    }

    protected List<ConsumeMessage> doPollPartition(String str, short s, int i, long j, TimeUnit timeUnit, PollerListener pollerListener) {
        return doPollPartition(str, s, -1L, i, j, timeUnit, pollerListener);
    }

    protected List<ConsumeMessage> doPollPartition(String str, short s, long j, int i, long j2, TimeUnit timeUnit, PollerListener pollerListener) {
        checkState();
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "topic not blank");
        Preconditions.checkArgument(timeUnit != null, "timeoutUnit not null");
        TopicMetadata andCheckTopicMetadata = this.messagePollerInner.getAndCheckTopicMetadata(str);
        PartitionMetadata partition = andCheckTopicMetadata.getPartition(s);
        if (partition == null) {
            throw new ConsumerException(String.format("partition not exist, topic: %s, partition: %s", str, Short.valueOf(s)), JoyQueueCode.FW_TOPIC_NO_PARTITIONGROUP.getCode());
        }
        if (partition.getLeader() == null) {
            throw new ConsumerException(String.format("partition not available, topic: %s, partition: %s", str, Short.valueOf(s)), JoyQueueCode.FW_TOPIC_NO_PARTITIONGROUP.getCode());
        }
        if (!partition.getLeader().isReadable()) {
            throw new ConsumerException(String.format("partition not readable, topic: %s, partition: %s", str, Short.valueOf(s)), JoyQueueCode.FW_TOPIC_NO_PARTITIONGROUP.getCode());
        }
        if (i == -1) {
            i = this.config.getBatchSize() == -1 ? andCheckTopicMetadata.getConsumerPolicy().getBatchSize().shortValue() : this.config.getBatchSize();
        }
        return this.messagePollerInner.fetchPartition(partition.getLeader(), andCheckTopicMetadata, s, j, i, j2, timeUnit, pollerListener);
    }

    protected List<ConsumeMessage> doPoll(String str, int i, long j, TimeUnit timeUnit, PollerListener pollerListener) {
        checkState();
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "topic not blank");
        Preconditions.checkArgument(timeUnit != null, "timeoutUnit not null");
        TopicMetadata andCheckTopicMetadata = this.messagePollerInner.getAndCheckTopicMetadata(str);
        BrokerLoadBalance brokerLoadBalance = this.messagePollerInner.getBrokerLoadBalance(str);
        BrokerAssignments filterNotAvailableBrokers = this.messagePollerInner.filterNotAvailableBrokers(fetchBrokerAssignment(andCheckTopicMetadata));
        if (CollectionUtils.isEmpty(filterNotAvailableBrokers.getAssignments())) {
            logger.warn("no broker available, topic: {}", andCheckTopicMetadata.getTopic());
            return this.messagePollerInner.buildPollEmptyResult(pollerListener);
        }
        if (i == -1) {
            i = this.config.getBatchSize() != -1 ? this.config.getBatchSize() : andCheckTopicMetadata.getConsumerPolicy().getBatchSize().shortValue();
        }
        return this.messagePollerInner.fetchTopic(brokerLoadBalance.loadBalance(filterNotAvailableBrokers).getBroker(), andCheckTopicMetadata, i, j, timeUnit, pollerListener);
    }

    protected BrokerAssignments fetchBrokerAssignment(TopicMetadata topicMetadata) {
        BrokerAssignments buildAllBrokerAssignments;
        if (this.brokerAssignmentCache != null && !this.brokerAssignmentCache.isExpired(this.config.getSessionTimeout())) {
            return this.brokerAssignmentCache.getBrokerAssignments();
        }
        if (this.config.isLoadBalance()) {
            buildAllBrokerAssignments = this.messagePollerInner.filterNotAvailableBrokers(this.consumerCoordinator.fetchBrokerAssignment(topicMetadata, this.config.getAppFullName(), this.config.getSessionTimeout()));
            if ((buildAllBrokerAssignments == null || CollectionUtils.isEmpty(buildAllBrokerAssignments.getAssignments())) && this.config.isFailover()) {
                logger.debug("no assignment available, assign all broker, topic: {}", topicMetadata.getTopic());
                buildAllBrokerAssignments = this.messagePollerInner.buildAllBrokerAssignments(topicMetadata);
            }
        } else {
            buildAllBrokerAssignments = this.messagePollerInner.buildAllBrokerAssignments(topicMetadata);
        }
        BrokerAssignments filterRegionBrokers = this.messagePollerInner.filterRegionBrokers(topicMetadata, buildAllBrokerAssignments);
        if (topicMetadata.isAllAvailable()) {
            this.brokerAssignmentCache = new BrokerAssignmentsHolder(filterRegionBrokers, SystemClock.now());
        }
        return filterRegionBrokers;
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public synchronized JoyQueueCode reply(String str, List<ConsumeReply> list) {
        checkState();
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "topic not blank");
        TopicMetadata andCheckTopicMetadata = this.messagePollerInner.getAndCheckTopicMetadata(str);
        if (CollectionUtils.isEmpty(list)) {
            throw new IllegalArgumentException(String.format("topic %s reply is empty", str));
        }
        JoyQueueCode commitReply = this.consumerIndexManager.commitReply(andCheckTopicMetadata.getTopic(), list, this.config.getAppFullName(), this.config.getTimeout());
        if (!commitReply.equals(JoyQueueCode.SUCCESS)) {
            logger.warn("commit ack error, topic : {}, code: {}, error: {}", new Object[]{str, Integer.valueOf(commitReply.getCode()), commitReply.getMessage(new Object[0])});
        }
        return commitReply;
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public JoyQueueCode replyOnce(String str, ConsumeReply consumeReply) {
        return reply(str, Lists.newArrayList(new ConsumeReply[]{consumeReply}));
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public FetchIndexData fetchIndex(String str, short s) {
        checkState();
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "topic not blank");
        return this.consumerIndexManager.fetchIndex(this.messagePollerInner.getAndCheckTopicMetadata(str).getTopic(), this.config.getAppFullName(), s, this.config.getTimeout());
    }

    @Override // org.joyqueue.client.internal.consumer.MessagePoller
    public TopicMetadata getTopicMetadata(String str) {
        checkState();
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "topic not blank");
        return this.clusterManager.fetchTopicMetadata(this.messagePollerInner.getTopicFullName(str), this.config.getAppFullName());
    }

    protected void checkState() {
        if (!isStarted()) {
            throw new ConsumerException("consumer is not started", JoyQueueCode.CN_SERVICE_NOT_AVAILABLE.getCode());
        }
    }
}
