package org.apache.iotdb.confignode.persistence.subscription;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMetaKeeper;
import org.apache.iotdb.commons.subscription.meta.subscription.SubscriptionMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMetaKeeper;
import org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.AlterConsumerGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime.ConsumerGroupHandleMetaChangePlan;
import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.AlterMultipleTopicsPlan;
import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.AlterTopicPlan;
import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan;
import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan;
import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.runtime.TopicHandleMetaChangePlan;
import org.apache.iotdb.confignode.consensus.response.subscription.SubscriptionTableResp;
import org.apache.iotdb.confignode.consensus.response.subscription.TopicTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.class */
public class SubscriptionInfo implements SnapshotProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionInfo.class);
    private static final String SNAPSHOT_FILE_NAME = "subscription_info.bin";
    private final ReentrantReadWriteLock subscriptionInfoLock = new ReentrantReadWriteLock(true);
    private final TopicMetaKeeper topicMetaKeeper = new TopicMetaKeeper();
    private final ConsumerGroupMetaKeeper consumerGroupMetaKeeper = new ConsumerGroupMetaKeeper();
    private final SubscriptionInfoVersion subscriptionInfoVersion = new SubscriptionInfoVersion();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo$SubscriptionInfoVersion.class */
    public class SubscriptionInfoVersion {
        private final AtomicLong latestVersion = new AtomicLong(0);
        private long lastSyncedVersion = 0;
        private boolean isLastSyncedPipeTaskInfoEmpty = false;

        public SubscriptionInfoVersion() {
        }

        public void increaseLatestVersion() {
            this.latestVersion.incrementAndGet();
        }

        public void updateLastSyncedVersion() {
            this.lastSyncedVersion = this.latestVersion.get();
            this.isLastSyncedPipeTaskInfoEmpty = SubscriptionInfo.this.topicMetaKeeper.isEmpty() && SubscriptionInfo.this.consumerGroupMetaKeeper.isEmpty();
        }

        public boolean canSkipNextSync() {
            return this.isLastSyncedPipeTaskInfoEmpty && SubscriptionInfo.this.topicMetaKeeper.isEmpty() && SubscriptionInfo.this.consumerGroupMetaKeeper.isEmpty() && this.lastSyncedVersion == this.latestVersion.get();
        }
    }

    private void acquireReadLock() {
        this.subscriptionInfoLock.readLock().lock();
    }

    private void releaseReadLock() {
        this.subscriptionInfoLock.readLock().unlock();
    }

    public void acquireWriteLock() {
        this.subscriptionInfoLock.writeLock().lock();
        this.subscriptionInfoVersion.increaseLatestVersion();
    }

    public void releaseWriteLock() {
        this.subscriptionInfoLock.writeLock().unlock();
    }

    public void updateLastSyncedVersion() {
        this.subscriptionInfoVersion.updateLastSyncedVersion();
    }

    public boolean canSkipNextSync() {
        return this.subscriptionInfoVersion.canSkipNextSync();
    }

    public boolean validateBeforeCreatingTopic(TCreateTopicReq tCreateTopicReq) throws SubscriptionException {
        acquireReadLock();
        try {
            return checkBeforeCreateTopicInternal(tCreateTopicReq);
        } finally {
            releaseReadLock();
        }
    }

    private boolean checkBeforeCreateTopicInternal(TCreateTopicReq tCreateTopicReq) throws SubscriptionException {
        if (!isTopicExisted(tCreateTopicReq.getTopicName())) {
            return true;
        }
        if (tCreateTopicReq.isSetIfNotExistsCondition() && tCreateTopicReq.isIfNotExistsCondition()) {
            return false;
        }
        String format = String.format("Failed to create topic %s, the topic with the same name has been created", tCreateTopicReq.getTopicName());
        LOGGER.warn(format);
        throw new SubscriptionException(format);
    }

    public void validateBeforeDroppingTopic(String str) throws SubscriptionException {
        acquireReadLock();
        try {
            checkBeforeDropTopicInternal(str);
        } finally {
            releaseReadLock();
        }
    }

    private void checkBeforeDropTopicInternal(String str) throws SubscriptionException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Check before dropping topic: {}, topic exists: {}", str, Boolean.valueOf(isTopicExisted(str)));
        }
        TopicMeta topicMeta = this.topicMetaKeeper.getTopicMeta(str);
        if (!Objects.isNull(topicMeta) && topicMeta.hasSubscribedConsumerGroup()) {
            String format = String.format("Failed to drop topic %s, the topic is subscribed by some consumers", topicMeta.getTopicName());
            LOGGER.warn(format);
            throw new SubscriptionException(format);
        }
    }

    public void validatePipePluginUsageByTopic(String str) throws SubscriptionException {
        acquireReadLock();
        try {
            validatePipePluginUsageByTopicInternal(str);
        } finally {
            releaseReadLock();
        }
    }

    public void validatePipePluginUsageByTopicInternal(String str) throws SubscriptionException {
        acquireReadLock();
        try {
            this.topicMetaKeeper.getAllTopicMeta().forEach(topicMeta -> {
                if (str.equals(topicMeta.getConfig().getAttribute().get("processor"))) {
                    String format = String.format("PipePlugin '%s' is already used by Topic '%s' as a processor.", str, topicMeta.getTopicName());
                    LOGGER.warn(format);
                    throw new SubscriptionException(format);
                }
            });
        } finally {
            releaseReadLock();
        }
    }

    public void validateBeforeAlteringTopic(TopicMeta topicMeta) throws SubscriptionException {
        acquireReadLock();
        try {
            checkBeforeAlteringTopicInternal(topicMeta);
        } finally {
            releaseReadLock();
        }
    }

    private void checkBeforeAlteringTopicInternal(TopicMeta topicMeta) throws SubscriptionException {
        if (isTopicExisted(topicMeta.getTopicName())) {
            return;
        }
        String format = String.format("Failed to alter topic %s, the topic is not existed", topicMeta.getTopicName());
        LOGGER.warn(format);
        throw new SubscriptionException(format);
    }

    public boolean isTopicExisted(String str) {
        acquireReadLock();
        try {
            return this.topicMetaKeeper.containsTopicMeta(str);
        } finally {
            releaseReadLock();
        }
    }

    public TopicMeta getTopicMeta(String str) {
        acquireReadLock();
        try {
            return this.topicMetaKeeper.getTopicMeta(str);
        } finally {
            releaseReadLock();
        }
    }

    public Iterable<TopicMeta> getAllTopicMeta() {
        acquireReadLock();
        try {
            return this.topicMetaKeeper.getAllTopicMeta();
        } finally {
            releaseReadLock();
        }
    }

    public TopicMeta deepCopyTopicMeta(String str) {
        acquireReadLock();
        try {
            return this.topicMetaKeeper.containsTopicMeta(str) ? this.topicMetaKeeper.getTopicMeta(str).deepCopy() : null;
        } finally {
            releaseReadLock();
        }
    }

    public DataSet showTopics() {
        acquireReadLock();
        try {
            return new TopicTableResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), (List) StreamSupport.stream(this.topicMetaKeeper.getAllTopicMeta().spliterator(), false).collect(Collectors.toList()));
        } finally {
            releaseReadLock();
        }
    }

    public TSStatus createTopic(CreateTopicPlan createTopicPlan) {
        acquireWriteLock();
        try {
            this.topicMetaKeeper.addTopicMeta(createTopicPlan.getTopicMeta().getTopicName(), createTopicPlan.getTopicMeta());
            return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        } finally {
            releaseWriteLock();
        }
    }

    public TSStatus alterTopic(AlterTopicPlan alterTopicPlan) {
        acquireWriteLock();
        try {
            this.topicMetaKeeper.removeTopicMeta(alterTopicPlan.getTopicMeta().getTopicName());
            this.topicMetaKeeper.addTopicMeta(alterTopicPlan.getTopicMeta().getTopicName(), alterTopicPlan.getTopicMeta());
            return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        } finally {
            releaseWriteLock();
        }
    }

    public TSStatus alterMultipleTopics(AlterMultipleTopicsPlan alterMultipleTopicsPlan) {
        acquireWriteLock();
        try {
            TSStatus tSStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            tSStatus.setSubStatus(new ArrayList());
            Iterator<AlterTopicPlan> it = alterMultipleTopicsPlan.getSubPlans().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                TSStatus alterTopic = alterTopic(it.next());
                tSStatus.getSubStatus().add(alterTopic);
                if (alterTopic.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                    tSStatus.setCode(TSStatusCode.ALTER_TOPIC_ERROR.getStatusCode());
                    break;
                }
            }
            if (tSStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                tSStatus.setSubStatus((List) null);
            }
            return tSStatus;
        } finally {
            releaseWriteLock();
        }
    }

    public TSStatus dropTopic(DropTopicPlan dropTopicPlan) {
        acquireWriteLock();
        try {
            this.topicMetaKeeper.removeTopicMeta(dropTopicPlan.getTopicName());
            return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        } finally {
            releaseWriteLock();
        }
    }

    public TSStatus handleTopicMetaChanges(TopicHandleMetaChangePlan topicHandleMetaChangePlan) {
        acquireWriteLock();
        try {
            LOGGER.info("Handling topic meta changes ...");
            this.topicMetaKeeper.clear();
            topicHandleMetaChangePlan.getTopicMetaList().forEach(topicMeta -> {
                this.topicMetaKeeper.addTopicMeta(topicMeta.getTopicName(), topicMeta);
                LOGGER.info("Recording topic meta: {}", topicMeta);
            });
            return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        } finally {
            releaseWriteLock();
        }
    }

    public void validateBeforeCreatingConsumer(TCreateConsumerReq tCreateConsumerReq) throws SubscriptionException {
        acquireReadLock();
        try {
            checkBeforeCreateConsumerInternal(tCreateConsumerReq);
        } finally {
            releaseReadLock();
        }
    }

    private void checkBeforeCreateConsumerInternal(TCreateConsumerReq tCreateConsumerReq) throws SubscriptionException {
        if (isConsumerGroupExisted(tCreateConsumerReq.getConsumerGroupId()) && isConsumerExisted(tCreateConsumerReq.getConsumerGroupId(), tCreateConsumerReq.getConsumerId())) {
            String format = String.format("Failed to create pipe consumer %s in consumer group %s, the consumer with the same name has been created", tCreateConsumerReq.getConsumerId(), tCreateConsumerReq.getConsumerGroupId());
            LOGGER.warn(format);
            throw new SubscriptionException(format);
        }
    }

    public void validateBeforeDroppingConsumer(TCloseConsumerReq tCloseConsumerReq) throws SubscriptionException {
        acquireReadLock();
        try {
            checkBeforeDropConsumerInternal(tCloseConsumerReq);
        } finally {
            releaseReadLock();
        }
    }

    private void checkBeforeDropConsumerInternal(TCloseConsumerReq tCloseConsumerReq) throws SubscriptionException {
        if (isConsumerExisted(tCloseConsumerReq.getConsumerGroupId(), tCloseConsumerReq.getConsumerId())) {
            return;
        }
        String format = String.format("Failed to drop pipe consumer %s in consumer group %s, the consumer does not exist", tCloseConsumerReq.getConsumerId(), tCloseConsumerReq.getConsumerGroupId());
        LOGGER.warn(format);
        throw new SubscriptionException(format);
    }

    public void validateBeforeAlterConsumerGroup(ConsumerGroupMeta consumerGroupMeta) throws SubscriptionException {
        acquireReadLock();
        try {
            checkBeforeAlterConsumerGroupInternal(consumerGroupMeta);
        } finally {
            releaseReadLock();
        }
    }

    public void checkBeforeAlterConsumerGroupInternal(ConsumerGroupMeta consumerGroupMeta) throws SubscriptionException {
        if (isConsumerGroupExisted(consumerGroupMeta.getConsumerGroupId())) {
            return;
        }
        String format = String.format("Failed to alter consumer group because the consumer group %s does not exist", consumerGroupMeta.getConsumerGroupId());
        LOGGER.warn(format);
        throw new SubscriptionException(format);
    }

    public boolean isConsumerGroupExisted(String str) {
        acquireReadLock();
        try {
            return this.consumerGroupMetaKeeper.containsConsumerGroupMeta(str);
        } finally {
            releaseReadLock();
        }
    }

    public boolean isConsumerExisted(String str, String str2) {
        boolean z;
        acquireReadLock();
        try {
            ConsumerGroupMeta consumerGroupMeta = this.consumerGroupMetaKeeper.getConsumerGroupMeta(str);
            if (consumerGroupMeta != null) {
                if (consumerGroupMeta.containsConsumer(str2)) {
                    z = true;
                    return z;
                }
            }
            z = false;
            return z;
        } finally {
            releaseReadLock();
        }
    }

    public ConsumerGroupMeta getConsumerGroupMeta(String str) {
        acquireReadLock();
        try {
            return this.consumerGroupMetaKeeper.getConsumerGroupMeta(str);
        } finally {
            releaseReadLock();
        }
    }

    public ConsumerGroupMeta deepCopyConsumerGroupMeta(String str) {
        acquireReadLock();
        try {
            return this.consumerGroupMetaKeeper.containsConsumerGroupMeta(str) ? this.consumerGroupMetaKeeper.getConsumerGroupMeta(str).deepCopy() : null;
        } finally {
            releaseReadLock();
        }
    }

    public TSStatus alterConsumerGroup(AlterConsumerGroupPlan alterConsumerGroupPlan) {
        acquireWriteLock();
        try {
            ConsumerGroupMeta consumerGroupMeta = alterConsumerGroupPlan.getConsumerGroupMeta();
            if (Objects.nonNull(consumerGroupMeta)) {
                String consumerGroupId = consumerGroupMeta.getConsumerGroupId();
                this.consumerGroupMetaKeeper.removeConsumerGroupMeta(consumerGroupId);
                if (!consumerGroupMeta.isEmpty()) {
                    this.consumerGroupMetaKeeper.addConsumerGroupMeta(consumerGroupId, consumerGroupMeta);
                }
            }
            TSStatus tSStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            releaseWriteLock();
            return tSStatus;
        } catch (Throwable th) {
            releaseWriteLock();
            throw th;
        }
    }

    public TSStatus handleConsumerGroupMetaChanges(ConsumerGroupHandleMetaChangePlan consumerGroupHandleMetaChangePlan) {
        acquireWriteLock();
        try {
            LOGGER.info("Handling consumer group meta changes ...");
            this.consumerGroupMetaKeeper.clear();
            consumerGroupHandleMetaChangePlan.getConsumerGroupMetaList().forEach(consumerGroupMeta -> {
                this.consumerGroupMetaKeeper.addConsumerGroupMeta(consumerGroupMeta.getConsumerGroupId(), consumerGroupMeta);
                LOGGER.info("Recording consumer group meta: {}", consumerGroupMeta);
            });
            return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        } finally {
            releaseWriteLock();
        }
    }

    public void validateBeforeSubscribe(TSubscribeReq tSubscribeReq) throws SubscriptionException {
        acquireReadLock();
        try {
            checkBeforeSubscribeInternal(tSubscribeReq);
        } finally {
            releaseReadLock();
        }
    }

    private void checkBeforeSubscribeInternal(TSubscribeReq tSubscribeReq) throws SubscriptionException {
        if (!isConsumerExisted(tSubscribeReq.getConsumerGroupId(), tSubscribeReq.getConsumerId())) {
            String format = String.format("Failed to subscribe because the consumer %s in consumer group %s does not exist", tSubscribeReq.getConsumerId(), tSubscribeReq.getConsumerGroupId());
            LOGGER.warn(format);
            throw new SubscriptionException(format);
        }
        for (String str : tSubscribeReq.getTopicNames()) {
            if (!isTopicExisted(str)) {
                String format2 = String.format("Failed to subscribe because the topic %s does not exist", str);
                LOGGER.warn(format2);
                throw new SubscriptionException(format2);
            }
        }
    }

    public void validateBeforeUnsubscribe(TUnsubscribeReq tUnsubscribeReq) throws SubscriptionException {
        acquireReadLock();
        try {
            checkBeforeUnsubscribeInternal(tUnsubscribeReq);
        } finally {
            releaseReadLock();
        }
    }

    private void checkBeforeUnsubscribeInternal(TUnsubscribeReq tUnsubscribeReq) throws SubscriptionException {
        if (!isConsumerExisted(tUnsubscribeReq.getConsumerGroupId(), tUnsubscribeReq.getConsumerId())) {
            String format = String.format("Failed to unsubscribe because the consumer %s in consumer group %s does not exist", tUnsubscribeReq.getConsumerId(), tUnsubscribeReq.getConsumerGroupId());
            LOGGER.warn(format);
            throw new SubscriptionException(format);
        }
        for (String str : tUnsubscribeReq.getTopicNames()) {
            if (!isTopicExisted(str)) {
                String format2 = String.format("Failed to unsubscribe because the topic %s does not exist", str);
                LOGGER.warn(format2);
                throw new SubscriptionException(format2);
            }
        }
    }

    public DataSet showSubscriptions() {
        acquireReadLock();
        try {
            return new SubscriptionTableResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), getAllSubscriptionMeta(), getAllConsumerGroupMeta());
        } finally {
            releaseReadLock();
        }
    }

    private List<SubscriptionMeta> getAllSubscriptionMeta() {
        ArrayList arrayList = new ArrayList();
        for (TopicMeta topicMeta : this.topicMetaKeeper.getAllTopicMeta()) {
            for (String str : topicMeta.getSubscribedConsumerGroupIds()) {
                Set consumersSubscribingTopic = this.consumerGroupMetaKeeper.getConsumersSubscribingTopic(str, topicMeta.getTopicName());
                if (!consumersSubscribingTopic.isEmpty()) {
                    arrayList.add(new SubscriptionMeta(topicMeta.getTopicName(), str, consumersSubscribingTopic));
                }
            }
        }
        return arrayList;
    }

    public List<ConsumerGroupMeta> getAllConsumerGroupMeta() {
        return (List) StreamSupport.stream(this.consumerGroupMetaKeeper.getAllConsumerGroupMeta().spliterator(), false).collect(Collectors.toList());
    }

    public boolean processTakeSnapshot(File file) throws IOException {
        acquireReadLock();
        try {
            File file2 = new File(file, SNAPSHOT_FILE_NAME);
            if (file2.exists() && file2.isFile()) {
                LOGGER.error("Failed to take subscription snapshot, because snapshot file {} is already exist.", file2.getAbsolutePath());
                releaseReadLock();
                return false;
            }
            FileOutputStream fileOutputStream = new FileOutputStream(file2);
            try {
                this.topicMetaKeeper.processTakeSnapshot(fileOutputStream);
                this.consumerGroupMetaKeeper.processTakeSnapshot(fileOutputStream);
                fileOutputStream.getFD().sync();
                fileOutputStream.close();
                return true;
            } finally {
            }
        } finally {
            releaseReadLock();
        }
    }

    public void processLoadSnapshot(File file) throws IOException {
        acquireWriteLock();
        try {
            File file2 = new File(file, SNAPSHOT_FILE_NAME);
            if (!file2.exists() || !file2.isFile()) {
                LOGGER.error("Failed to load subscription snapshot, snapshot file {} is not exist.", file2.getAbsolutePath());
                releaseWriteLock();
                return;
            }
            FileInputStream fileInputStream = new FileInputStream(file2);
            try {
                this.topicMetaKeeper.processLoadSnapshot(fileInputStream);
                this.consumerGroupMetaKeeper.processLoadSnapshot(fileInputStream);
                fileInputStream.close();
            } finally {
            }
        } finally {
            releaseWriteLock();
        }
    }
}
