package org.apache.iotdb.confignode.procedure.impl.subscription;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.runtime.ConsumerGroupMetaSyncProcedure;
import org.apache.iotdb.confignode.procedure.impl.subscription.topic.runtime.TopicMetaSyncProcedure;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import org.apache.iotdb.confignode.procedure.state.subscription.OperateSubscriptionState;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.class */
public abstract class AbstractOperateSubscriptionProcedure extends AbstractNodeProcedure<OperateSubscriptionState> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractOperateSubscriptionProcedure.class);
    private static final String SKIP_SUBSCRIPTION_PROCEDURE_MESSAGE = "Skip subscription-related operations and do nothing";
    private static final int RETRY_THRESHOLD = 1;
    protected boolean isRollbackFromOperateOnDataNodesSuccessful = false;
    protected boolean isRollbackFromValidateSuccessful = false;
    protected AtomicReference<SubscriptionInfo> subscriptionInfo;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iotdb$confignode$procedure$state$ProcedureLockState;
        static final /* synthetic */ int[] $SwitchMap$org$apache$iotdb$confignode$procedure$state$subscription$OperateSubscriptionState = new int[OperateSubscriptionState.values().length];

        static {
            try {
                $SwitchMap$org$apache$iotdb$confignode$procedure$state$subscription$OperateSubscriptionState[OperateSubscriptionState.VALIDATE.ordinal()] = AbstractOperateSubscriptionProcedure.RETRY_THRESHOLD;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$iotdb$confignode$procedure$state$subscription$OperateSubscriptionState[OperateSubscriptionState.OPERATE_ON_CONFIG_NODES.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$iotdb$confignode$procedure$state$subscription$OperateSubscriptionState[OperateSubscriptionState.OPERATE_ON_DATA_NODES.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$iotdb$confignode$procedure$state$ProcedureLockState = new int[ProcedureLockState.values().length];
            try {
                $SwitchMap$org$apache$iotdb$confignode$procedure$state$ProcedureLockState[ProcedureLockState.LOCK_ACQUIRED.ordinal()] = AbstractOperateSubscriptionProcedure.RETRY_THRESHOLD;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$iotdb$confignode$procedure$state$ProcedureLockState[ProcedureLockState.LOCK_EVENT_WAIT.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    protected AtomicReference<SubscriptionInfo> acquireLockInternal(ConfigNodeProcedureEnv configNodeProcedureEnv) {
        return configNodeProcedureEnv.getConfigManager().getSubscriptionManager().getSubscriptionCoordinator().lock();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure, org.apache.iotdb.confignode.procedure.Procedure
    public ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
        LOGGER.info("ProcedureId {} try to acquire subscription lock.", Long.valueOf(getProcId()));
        this.subscriptionInfo = acquireLockInternal(configNodeProcedureEnv);
        if (this.subscriptionInfo == null) {
            LOGGER.warn("ProcedureId {} failed to acquire subscription lock.", Long.valueOf(getProcId()));
        } else {
            LOGGER.info("ProcedureId {} acquired subscription lock.", Long.valueOf(getProcId()));
        }
        ProcedureLockState acquireLock = super.acquireLock(configNodeProcedureEnv);
        switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$confignode$procedure$state$ProcedureLockState[acquireLock.ordinal()]) {
            case RETRY_THRESHOLD /* 1 */:
                if (this.subscriptionInfo != null) {
                    LOGGER.info("ProcedureId {}: LOCK_ACQUIRED. The following procedure should be executed with subscription lock.", Long.valueOf(getProcId()));
                    break;
                } else {
                    LOGGER.warn("ProcedureId {}: LOCK_ACQUIRED. The following procedure should not be executed without subscription lock.", Long.valueOf(getProcId()));
                    break;
                }
            case 2:
                if (this.subscriptionInfo != null) {
                    LOGGER.info("ProcedureId {}: LOCK_EVENT_WAIT. Subscription lock will be released.", Long.valueOf(getProcId()));
                    configNodeProcedureEnv.getConfigManager().getSubscriptionManager().getSubscriptionCoordinator().unlock();
                    this.subscriptionInfo = null;
                    break;
                } else {
                    LOGGER.warn("ProcedureId {}: LOCK_EVENT_WAIT. Without acquiring subscription lock.", Long.valueOf(getProcId()));
                    break;
                }
            default:
                if (this.subscriptionInfo != null) {
                    LOGGER.error("ProcedureId {}: {}. Invalid lock state. Subscription lock will be released.", Long.valueOf(getProcId()), acquireLock);
                    configNodeProcedureEnv.getConfigManager().getSubscriptionManager().getSubscriptionCoordinator().unlock();
                    this.subscriptionInfo = null;
                    break;
                } else {
                    LOGGER.error("ProcedureId {}: {}. Invalid lock state. Without acquiring subscription lock.", Long.valueOf(getProcId()), acquireLock);
                    break;
                }
        }
        return acquireLock;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure, org.apache.iotdb.confignode.procedure.Procedure
    public void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
        super.releaseLock(configNodeProcedureEnv);
        if (this.subscriptionInfo == null) {
            LOGGER.warn("ProcedureId {} release lock. No need to release subscription lock.", Long.valueOf(getProcId()));
            return;
        }
        LOGGER.info("ProcedureId {} release lock. Subscription lock will be released.", Long.valueOf(getProcId()));
        if ((this instanceof TopicMetaSyncProcedure) || (this instanceof ConsumerGroupMetaSyncProcedure)) {
            LOGGER.info("Subscription meta sync procedure finished, updating last sync version.");
            configNodeProcedureEnv.getConfigManager().getSubscriptionManager().getSubscriptionCoordinator().updateLastSyncedVersion();
        }
        configNodeProcedureEnv.getConfigManager().getSubscriptionManager().getSubscriptionCoordinator().unlock();
        this.subscriptionInfo = null;
    }

    protected abstract SubscriptionOperation getOperation();

    protected abstract boolean executeFromValidate(ConfigNodeProcedureEnv configNodeProcedureEnv) throws SubscriptionException;

    protected abstract void executeFromOperateOnConfigNodes(ConfigNodeProcedureEnv configNodeProcedureEnv) throws SubscriptionException;

    protected abstract void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv configNodeProcedureEnv) throws SubscriptionException, IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public StateMachineProcedure.Flow executeFromState(ConfigNodeProcedureEnv configNodeProcedureEnv, OperateSubscriptionState operateSubscriptionState) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
        if (this.subscriptionInfo == null) {
            LOGGER.warn("ProcedureId {}: Subscription lock is not acquired, executeFromState({})'s execution will be skipped.", Long.valueOf(getProcId()), operateSubscriptionState);
            return StateMachineProcedure.Flow.NO_MORE_STATE;
        }
        try {
            switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$confignode$procedure$state$subscription$OperateSubscriptionState[operateSubscriptionState.ordinal()]) {
                case RETRY_THRESHOLD /* 1 */:
                    if (!executeFromValidate(configNodeProcedureEnv)) {
                        LOGGER.info("ProcedureId {}: {}", Long.valueOf(getProcId()), SKIP_SUBSCRIPTION_PROCEDURE_MESSAGE);
                        setResult(SKIP_SUBSCRIPTION_PROCEDURE_MESSAGE.getBytes(StandardCharsets.UTF_8));
                        return StateMachineProcedure.Flow.NO_MORE_STATE;
                    }
                    setNextState((AbstractOperateSubscriptionProcedure) OperateSubscriptionState.OPERATE_ON_CONFIG_NODES);
                    break;
                case 2:
                    executeFromOperateOnConfigNodes(configNodeProcedureEnv);
                    setNextState((AbstractOperateSubscriptionProcedure) OperateSubscriptionState.OPERATE_ON_DATA_NODES);
                    break;
                case 3:
                    executeFromOperateOnDataNodes(configNodeProcedureEnv);
                    return StateMachineProcedure.Flow.NO_MORE_STATE;
                default:
                    throw new UnsupportedOperationException(String.format("Unknown state during executing operateSubscriptionProcedure, %s", operateSubscriptionState));
            }
        } catch (Exception e) {
            if (getCycles() < RETRY_THRESHOLD) {
                LOGGER.warn("ProcedureId {}: Encountered error when trying to {} at state [{}], retry [{}/{}]", new Object[]{Long.valueOf(getProcId()), getOperation(), operateSubscriptionState, Integer.valueOf(getCycles() + RETRY_THRESHOLD), Integer.valueOf(RETRY_THRESHOLD), e});
                TimeUnit.MILLISECONDS.sleep(3000L);
            } else {
                LOGGER.warn("ProcedureId {}: All {} retries failed when trying to {} at state [{}], will rollback...", new Object[]{Long.valueOf(getProcId()), Integer.valueOf(RETRY_THRESHOLD), getOperation(), operateSubscriptionState, e});
                setFailure(new ProcedureException(String.format("ProcedureId %s: Fail to %s because %s", Long.valueOf(getProcId()), getOperation().name(), e.getMessage())));
            }
        }
        return StateMachineProcedure.Flow.HAS_MORE_STATE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public void rollbackState(ConfigNodeProcedureEnv configNodeProcedureEnv, OperateSubscriptionState operateSubscriptionState) throws IOException, InterruptedException, ProcedureException {
        if (this.subscriptionInfo == null) {
            LOGGER.warn("ProcedureId {}: Subscription lock is not acquired, rollbackState({})'s execution will be skipped.", Long.valueOf(getProcId()), operateSubscriptionState);
            return;
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$confignode$procedure$state$subscription$OperateSubscriptionState[operateSubscriptionState.ordinal()]) {
            case RETRY_THRESHOLD /* 1 */:
                if (this.isRollbackFromValidateSuccessful) {
                    return;
                }
                try {
                    rollbackFromValidate(configNodeProcedureEnv);
                    this.isRollbackFromValidateSuccessful = true;
                    return;
                } catch (Exception e) {
                    LOGGER.warn("ProcedureId {}: Failed to rollback from state [{}], because {}", new Object[]{Long.valueOf(getProcId()), operateSubscriptionState, e.getMessage(), e});
                    return;
                }
            case 2:
                try {
                    if (!this.isRollbackFromOperateOnDataNodesSuccessful) {
                        rollbackFromOperateOnConfigNodes(configNodeProcedureEnv);
                    }
                    return;
                } catch (Exception e2) {
                    LOGGER.warn("ProcedureId {}: Failed to rollback from state [{}], because {}", new Object[]{Long.valueOf(getProcId()), operateSubscriptionState, e2.getMessage(), e2});
                    return;
                }
            case 3:
                try {
                    rollbackFromOperateOnConfigNodes(configNodeProcedureEnv);
                    rollbackFromOperateOnDataNodes(configNodeProcedureEnv);
                    this.isRollbackFromOperateOnDataNodesSuccessful = true;
                    return;
                } catch (Exception e3) {
                    LOGGER.warn("ProcedureId {}: Failed to rollback from state [{}], because {}", new Object[]{Long.valueOf(getProcId()), operateSubscriptionState, e3.getMessage(), e3});
                    return;
                }
            default:
                throw new UnsupportedOperationException(String.format("Unknown state during rollback operateSubscriptionProcedure, %s", operateSubscriptionState));
        }
    }

    protected abstract void rollbackFromValidate(ConfigNodeProcedureEnv configNodeProcedureEnv);

    protected abstract void rollbackFromOperateOnConfigNodes(ConfigNodeProcedureEnv configNodeProcedureEnv) throws SubscriptionException;

    protected abstract void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv configNodeProcedureEnv) throws SubscriptionException, IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Integer, TPushTopicMetaResp> pushTopicMetaToDataNodes(ConfigNodeProcedureEnv configNodeProcedureEnv) throws IOException {
        ArrayList arrayList = new ArrayList();
        Iterator<TopicMeta> it = this.subscriptionInfo.get().getAllTopicMeta().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().serialize());
        }
        return configNodeProcedureEnv.pushAllTopicMetaToDataNodes(arrayList);
    }

    public static boolean pushTopicMetaHasException(Map<Integer, TPushTopicMetaResp> map) {
        Iterator<TPushTopicMetaResp> it = map.values().iterator();
        while (it.hasNext()) {
            if (it.next().getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                return true;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Integer, TPushConsumerGroupMetaResp> pushConsumerGroupMetaToDataNodes(ConfigNodeProcedureEnv configNodeProcedureEnv) throws IOException {
        ArrayList arrayList = new ArrayList();
        Iterator<ConsumerGroupMeta> it = this.subscriptionInfo.get().getAllConsumerGroupMeta().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().serialize());
        }
        return configNodeProcedureEnv.pushAllConsumerGroupMetaToDataNodes(arrayList);
    }

    public static boolean pushConsumerGroupMetaHasException(Map<Integer, TPushConsumerGroupMetaResp> map) {
        Iterator<TPushConsumerGroupMetaResp> it = map.values().iterator();
        while (it.hasNext()) {
            if (it.next().getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                return true;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public OperateSubscriptionState getState(int i) {
        return OperateSubscriptionState.values()[i];
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public int getStateId(OperateSubscriptionState operateSubscriptionState) {
        return operateSubscriptionState.ordinal();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public OperateSubscriptionState getInitialState() {
        return OperateSubscriptionState.VALIDATE;
    }
}
