package org.apache.iotdb.db.subscription.broker;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetrics;
import org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/subscription/broker/SubscriptionBroker.class */
public class SubscriptionBroker {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionBroker.class);
    private final String brokerId;
    private final Map<String, SubscriptionPrefetchingQueue> topicNameToPrefetchingQueue = new ConcurrentHashMap();
    private final Map<String, String> completedTopicNames = new ConcurrentHashMap();
    private final Map<String, AtomicLong> topicNameToCommitIdGenerator = new ConcurrentHashMap();
    private final LoadingCache<String, SubscriptionStates> consumerIdToSubscriptionStates = Caffeine.newBuilder().expireAfterAccess(60, TimeUnit.SECONDS).build(str -> {
        return new SubscriptionStates();
    });

    public SubscriptionBroker(String str) {
        this.brokerId = str;
    }

    public boolean isEmpty() {
        return this.topicNameToPrefetchingQueue.isEmpty() && this.completedTopicNames.isEmpty() && this.topicNameToCommitIdGenerator.isEmpty();
    }

    public List<SubscriptionEvent> poll(String str, Set<String> set, long j) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList(prepareCandidateTopicNames(set, arrayList));
        arrayList2.sort(Comparator.comparingLong(str2 -> {
            return ((SubscriptionStates) Objects.requireNonNull((SubscriptionStates) this.consumerIdToSubscriptionStates.get(str))).getStates(str2);
        }));
        ArrayList arrayList3 = new ArrayList();
        long j2 = 0;
        HashMap hashMap = new HashMap();
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            SubscriptionPrefetchingQueue subscriptionPrefetchingQueue = this.topicNameToPrefetchingQueue.get((String) it.next());
            if (!Objects.isNull(subscriptionPrefetchingQueue) && !subscriptionPrefetchingQueue.isClosed()) {
                SubscriptionEvent poll = subscriptionPrefetchingQueue.poll(str);
                if (Objects.isNull(poll)) {
                    continue;
                } else {
                    try {
                        long currentResponseSize = poll.getCurrentResponseSize();
                        arrayList.add(poll);
                        hashMap.merge(poll.getCommitContext().getTopicName(), 1L, (v0, v1) -> {
                            return Long.sum(v0, v1);
                        });
                        j2 += currentResponseSize;
                        if (j2 + currentResponseSize > j) {
                            break;
                        }
                    } catch (IOException e) {
                        arrayList3.add(poll);
                    }
                }
            }
        }
        ((SubscriptionStates) Objects.requireNonNull((SubscriptionStates) this.consumerIdToSubscriptionStates.get(str))).updateStates(hashMap);
        commit(str, (List) arrayList3.stream().map((v0) -> {
            return v0.getCommitContext();
        }).collect(Collectors.toList()), true);
        return arrayList;
    }

    private Set<String> prepareCandidateTopicNames(Set<String> set, List<SubscriptionEvent> list) {
        HashSet hashSet = new HashSet();
        for (String str : set) {
            SubscriptionPrefetchingQueue subscriptionPrefetchingQueue = this.topicNameToPrefetchingQueue.get(str);
            if (Objects.isNull(subscriptionPrefetchingQueue)) {
                if (this.completedTopicNames.containsKey(str)) {
                    LOGGER.info("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is completed, return termination response to client", str, this.brokerId);
                    list.add(new SubscriptionEvent(SubscriptionPollResponseType.TERMINATION.getType(), (SubscriptionPollPayload) new TerminationPayload(), new SubscriptionCommitContext(IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), PipeDataNodeAgent.runtime().getRebootTimes(), str, this.brokerId, -1L)));
                }
            } else if (subscriptionPrefetchingQueue.isClosed()) {
                LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", str, this.brokerId);
            } else {
                hashSet.add(str);
            }
        }
        return hashSet;
    }

    public List<SubscriptionEvent> pollTsFile(String str, SubscriptionCommitContext subscriptionCommitContext, long j) {
        String topicName = subscriptionCommitContext.getTopicName();
        SubscriptionPrefetchingQueue subscriptionPrefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
        if (Objects.isNull(subscriptionPrefetchingQueue)) {
            String format = String.format("Subscription: prefetching queue bound to topic [%s] for consumer group [%s] does not exist", topicName, this.brokerId);
            LOGGER.warn(format);
            throw new SubscriptionException(format);
        }
        if (!(subscriptionPrefetchingQueue instanceof SubscriptionPrefetchingTsFileQueue)) {
            String format2 = String.format("Subscription: prefetching queue bound to topic [%s] for consumer group [%s] is invalid", topicName, this.brokerId);
            LOGGER.warn(format2);
            throw new SubscriptionException(format2);
        }
        if (subscriptionPrefetchingQueue.isClosed()) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", topicName, this.brokerId);
            return Collections.emptyList();
        }
        SubscriptionEvent pollTsFile = ((SubscriptionPrefetchingTsFileQueue) subscriptionPrefetchingQueue).pollTsFile(str, subscriptionCommitContext, j);
        return Objects.nonNull(pollTsFile) ? Collections.singletonList(pollTsFile) : Collections.emptyList();
    }

    public List<SubscriptionEvent> pollTablets(String str, SubscriptionCommitContext subscriptionCommitContext, int i) {
        String topicName = subscriptionCommitContext.getTopicName();
        SubscriptionPrefetchingQueue subscriptionPrefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
        if (Objects.isNull(subscriptionPrefetchingQueue)) {
            String format = String.format("Subscription: prefetching queue bound to topic [%s] for consumer group [%s] does not exist", topicName, this.brokerId);
            LOGGER.warn(format);
            throw new SubscriptionException(format);
        }
        if (!(subscriptionPrefetchingQueue instanceof SubscriptionPrefetchingTabletQueue)) {
            String format2 = String.format("Subscription: prefetching queue bound to topic [%s] for consumer group [%s] is invalid", topicName, this.brokerId);
            LOGGER.warn(format2);
            throw new SubscriptionException(format2);
        }
        if (subscriptionPrefetchingQueue.isClosed()) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", topicName, this.brokerId);
            return Collections.emptyList();
        }
        SubscriptionEvent pollTablets = ((SubscriptionPrefetchingTabletQueue) subscriptionPrefetchingQueue).pollTablets(str, subscriptionCommitContext, i);
        return Objects.nonNull(pollTablets) ? Collections.singletonList(pollTablets) : Collections.emptyList();
    }

    public List<SubscriptionCommitContext> commit(String str, List<SubscriptionCommitContext> list, boolean z) {
        ArrayList arrayList = new ArrayList();
        for (SubscriptionCommitContext subscriptionCommitContext : list) {
            String topicName = subscriptionCommitContext.getTopicName();
            SubscriptionPrefetchingQueue subscriptionPrefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
            if (Objects.isNull(subscriptionPrefetchingQueue)) {
                LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist", topicName, this.brokerId);
            } else if (subscriptionPrefetchingQueue.isClosed()) {
                LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", topicName, this.brokerId);
            } else if (z) {
                if (subscriptionPrefetchingQueue.nack(str, subscriptionCommitContext)) {
                    arrayList.add(subscriptionCommitContext);
                }
            } else if (subscriptionPrefetchingQueue.ack(str, subscriptionCommitContext)) {
                arrayList.add(subscriptionCommitContext);
            }
        }
        return arrayList;
    }

    public boolean isCommitContextOutdated(SubscriptionCommitContext subscriptionCommitContext) {
        SubscriptionPrefetchingQueue subscriptionPrefetchingQueue = this.topicNameToPrefetchingQueue.get(subscriptionCommitContext.getTopicName());
        if (Objects.isNull(subscriptionPrefetchingQueue)) {
            return true;
        }
        return subscriptionPrefetchingQueue.isCommitContextOutdated(subscriptionCommitContext);
    }

    public void bindPrefetchingQueue(String str, UnboundedBlockingPendingQueue<Event> unboundedBlockingPendingQueue) {
        if (Objects.nonNull(this.topicNameToPrefetchingQueue.get(str))) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] has already existed", str, this.brokerId);
            return;
        }
        SubscriptionPrefetchingQueue subscriptionPrefetchingTsFileQueue = "TsFileHandler".equals(SubscriptionAgent.topic().getTopicFormat(str)) ? new SubscriptionPrefetchingTsFileQueue(this.brokerId, str, new TsFileDeduplicationBlockingPendingQueue(unboundedBlockingPendingQueue), this.topicNameToCommitIdGenerator.computeIfAbsent(str, str2 -> {
            return new AtomicLong();
        })) : new SubscriptionPrefetchingTabletQueue(this.brokerId, str, new TsFileDeduplicationBlockingPendingQueue(unboundedBlockingPendingQueue), this.topicNameToCommitIdGenerator.computeIfAbsent(str, str3 -> {
            return new AtomicLong();
        }));
        SubscriptionPrefetchingQueueMetrics.getInstance().register(subscriptionPrefetchingTsFileQueue);
        this.topicNameToPrefetchingQueue.put(str, subscriptionPrefetchingTsFileQueue);
        LOGGER.info("Subscription: create prefetching queue bound to topic [{}] for consumer group [{}]", str, this.brokerId);
    }

    public void unbindPrefetchingQueue(String str) {
        SubscriptionPrefetchingQueue subscriptionPrefetchingQueue = this.topicNameToPrefetchingQueue.get(str);
        if (Objects.isNull(subscriptionPrefetchingQueue)) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist", str, this.brokerId);
            return;
        }
        subscriptionPrefetchingQueue.markClosed();
        if (SubscriptionAgent.topic().getTopicMode(str).equals("snapshot") && subscriptionPrefetchingQueue.isCompleted()) {
            this.completedTopicNames.put(str, str);
        }
        subscriptionPrefetchingQueue.cleanUp();
        SubscriptionPrefetchingQueueMetrics.getInstance().deregister(subscriptionPrefetchingQueue.getPrefetchingQueueId());
        this.topicNameToPrefetchingQueue.remove(str);
        LOGGER.info("Subscription: drop prefetching queue bound to topic [{}] for consumer group [{}]", str, this.brokerId);
    }

    public void removePrefetchingQueue(String str) {
        if (Objects.nonNull(this.topicNameToPrefetchingQueue.get(str))) {
            LOGGER.info("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] still exists, unbind it before closing", str, this.brokerId);
            unbindPrefetchingQueue(str);
        }
        this.completedTopicNames.remove(str);
        this.topicNameToCommitIdGenerator.remove(str);
    }

    public boolean executePrefetch(String str) {
        SubscriptionPrefetchingQueue subscriptionPrefetchingQueue = this.topicNameToPrefetchingQueue.get(str);
        if (Objects.isNull(subscriptionPrefetchingQueue)) {
            SubscriptionDataNodeResourceManager.log().schedule(SubscriptionBroker.class, this.brokerId, str).ifPresent(logger -> {
                logger.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist", str, this.brokerId);
            });
            return false;
        }
        if (!subscriptionPrefetchingQueue.isClosed()) {
            return subscriptionPrefetchingQueue.executePrefetch();
        }
        SubscriptionDataNodeResourceManager.log().schedule(SubscriptionBroker.class, this.brokerId, str).ifPresent(logger2 -> {
            logger2.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", str, this.brokerId);
        });
        return false;
    }

    public int getPipeEventCount(String str) {
        SubscriptionPrefetchingQueue subscriptionPrefetchingQueue = this.topicNameToPrefetchingQueue.get(str);
        if (Objects.isNull(subscriptionPrefetchingQueue)) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist", str, this.brokerId);
            return 0;
        }
        if (!subscriptionPrefetchingQueue.isClosed()) {
            return subscriptionPrefetchingQueue.getPipeEventCount();
        }
        LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", str, this.brokerId);
        return 0;
    }

    public int getPrefetchingQueueCount() {
        return this.topicNameToPrefetchingQueue.size();
    }
}
