package org.apache.iotdb.confignode.procedure.impl.pipe;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.confignode.manager.pipe.metric.PipeProcedureMetrics;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.class */
public abstract class AbstractOperatePipeProcedureV2 extends AbstractNodeProcedure<OperatePipeTaskState> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractOperatePipeProcedureV2.class);
    private static final int RETRY_THRESHOLD = 1;
    protected boolean isRollbackFromOperateOnDataNodesSuccessful = false;
    protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
    private static final String SKIP_PIPE_PROCEDURE_MESSAGE = "Try to start a RUNNING pipe or stop a STOPPED pipe, do nothing.";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iotdb$confignode$procedure$state$ProcedureLockState;
        static final /* synthetic */ int[] $SwitchMap$org$apache$iotdb$confignode$procedure$state$pipe$task$OperatePipeTaskState = new int[OperatePipeTaskState.values().length];

        static {
            try {
                $SwitchMap$org$apache$iotdb$confignode$procedure$state$pipe$task$OperatePipeTaskState[OperatePipeTaskState.VALIDATE_TASK.ordinal()] = AbstractOperatePipeProcedureV2.RETRY_THRESHOLD;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$iotdb$confignode$procedure$state$pipe$task$OperatePipeTaskState[OperatePipeTaskState.CALCULATE_INFO_FOR_TASK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$iotdb$confignode$procedure$state$pipe$task$OperatePipeTaskState[OperatePipeTaskState.WRITE_CONFIG_NODE_CONSENSUS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$iotdb$confignode$procedure$state$pipe$task$OperatePipeTaskState[OperatePipeTaskState.OPERATE_ON_DATA_NODES.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$org$apache$iotdb$confignode$procedure$state$ProcedureLockState = new int[ProcedureLockState.values().length];
            try {
                $SwitchMap$org$apache$iotdb$confignode$procedure$state$ProcedureLockState[ProcedureLockState.LOCK_ACQUIRED.ordinal()] = AbstractOperatePipeProcedureV2.RETRY_THRESHOLD;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$iotdb$confignode$procedure$state$ProcedureLockState[ProcedureLockState.LOCK_EVENT_WAIT.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    protected AtomicReference<PipeTaskInfo> acquireLockInternal(ConfigNodeProcedureEnv configNodeProcedureEnv) {
        return configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure, org.apache.iotdb.confignode.procedure.Procedure
    public ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
        LOGGER.info("ProcedureId {} try to acquire pipe lock.", Long.valueOf(getProcId()));
        this.pipeTaskInfo = acquireLockInternal(configNodeProcedureEnv);
        if (this.pipeTaskInfo == null) {
            LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", Long.valueOf(getProcId()));
        } else {
            LOGGER.info("ProcedureId {} acquired pipe lock.", Long.valueOf(getProcId()));
        }
        ProcedureLockState acquireLock = super.acquireLock(configNodeProcedureEnv);
        switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$confignode$procedure$state$ProcedureLockState[acquireLock.ordinal()]) {
            case RETRY_THRESHOLD /* 1 */:
                if (this.pipeTaskInfo != null) {
                    LOGGER.info("ProcedureId {}: LOCK_ACQUIRED. The following procedure should be executed with pipe lock.", Long.valueOf(getProcId()));
                    break;
                } else {
                    LOGGER.warn("ProcedureId {}: LOCK_ACQUIRED. The following procedure should not be executed without pipe lock.", Long.valueOf(getProcId()));
                    break;
                }
            case 2:
                if (this.pipeTaskInfo != null) {
                    LOGGER.info("ProcedureId {}: LOCK_EVENT_WAIT. Pipe lock will be released.", Long.valueOf(getProcId()));
                    configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
                    this.pipeTaskInfo = null;
                    break;
                } else {
                    LOGGER.warn("ProcedureId {}: LOCK_EVENT_WAIT. Without acquiring pipe lock.", Long.valueOf(getProcId()));
                    break;
                }
            default:
                if (this.pipeTaskInfo != null) {
                    LOGGER.error("ProcedureId {}: {}. Invalid lock state. Pipe lock will be released.", Long.valueOf(getProcId()), acquireLock);
                    configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
                    this.pipeTaskInfo = null;
                    break;
                } else {
                    LOGGER.error("ProcedureId {}: {}. Invalid lock state. Without acquiring pipe lock.", Long.valueOf(getProcId()), acquireLock);
                    break;
                }
        }
        return acquireLock;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure, org.apache.iotdb.confignode.procedure.Procedure
    public void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
        super.releaseLock(configNodeProcedureEnv);
        if (this.pipeTaskInfo == null) {
            LOGGER.warn("ProcedureId {} release lock. No need to release pipe lock.", Long.valueOf(getProcId()));
            return;
        }
        LOGGER.info("ProcedureId {} release lock. Pipe lock will be released.", Long.valueOf(getProcId()));
        if (this instanceof PipeMetaSyncProcedure) {
            configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().updateLastSyncedVersion();
        }
        PipeProcedureMetrics.getInstance().updateTimer(getOperation().getName(), elapsedTime());
        configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
        this.pipeTaskInfo = null;
    }

    protected abstract PipeTaskOperation getOperation();

    public abstract boolean executeFromValidateTask(ConfigNodeProcedureEnv configNodeProcedureEnv) throws PipeException;

    public abstract void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv configNodeProcedureEnv);

    public abstract void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv configNodeProcedureEnv) throws PipeException;

    public abstract void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv configNodeProcedureEnv) throws PipeException, IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public StateMachineProcedure.Flow executeFromState(ConfigNodeProcedureEnv configNodeProcedureEnv, OperatePipeTaskState operatePipeTaskState) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
        if (this.pipeTaskInfo == null) {
            LOGGER.warn("ProcedureId {}: Pipe lock is not acquired, executeFromState's execution will be skipped.", Long.valueOf(getProcId()));
            return StateMachineProcedure.Flow.NO_MORE_STATE;
        }
        try {
            switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$confignode$procedure$state$pipe$task$OperatePipeTaskState[operatePipeTaskState.ordinal()]) {
                case RETRY_THRESHOLD /* 1 */:
                    if (!executeFromValidateTask(configNodeProcedureEnv)) {
                        LOGGER.info("ProcedureId {}: {}", Long.valueOf(getProcId()), SKIP_PIPE_PROCEDURE_MESSAGE);
                        setResult(SKIP_PIPE_PROCEDURE_MESSAGE.getBytes(StandardCharsets.UTF_8));
                        return StateMachineProcedure.Flow.NO_MORE_STATE;
                    }
                    setNextState((AbstractOperatePipeProcedureV2) OperatePipeTaskState.CALCULATE_INFO_FOR_TASK);
                    break;
                case 2:
                    executeFromCalculateInfoForTask(configNodeProcedureEnv);
                    setNextState((AbstractOperatePipeProcedureV2) OperatePipeTaskState.WRITE_CONFIG_NODE_CONSENSUS);
                    break;
                case 3:
                    executeFromWriteConfigNodeConsensus(configNodeProcedureEnv);
                    setNextState((AbstractOperatePipeProcedureV2) OperatePipeTaskState.OPERATE_ON_DATA_NODES);
                    break;
                case 4:
                    executeFromOperateOnDataNodes(configNodeProcedureEnv);
                    return StateMachineProcedure.Flow.NO_MORE_STATE;
                default:
                    throw new UnsupportedOperationException(String.format("Unknown state during executing operatePipeProcedure, %s", operatePipeTaskState));
            }
        } catch (Exception e) {
            if (getCycles() < RETRY_THRESHOLD) {
                LOGGER.warn("ProcedureId {}: Encountered error when trying to {} at state [{}], retry [{}/{}]", new Object[]{Long.valueOf(getProcId()), getOperation(), operatePipeTaskState, Integer.valueOf(getCycles() + RETRY_THRESHOLD), Integer.valueOf(RETRY_THRESHOLD), e});
                TimeUnit.MILLISECONDS.sleep(3000L);
            } else {
                LOGGER.warn("ProcedureId {}: All {} retries failed when trying to {} at state [{}], will rollback...", new Object[]{Long.valueOf(getProcId()), Integer.valueOf(RETRY_THRESHOLD), getOperation(), operatePipeTaskState, e});
                setFailure(new ProcedureException(String.format("ProcedureId %s: Fail to %s because %s", Long.valueOf(getProcId()), getOperation().name(), e.getMessage())));
            }
        }
        return StateMachineProcedure.Flow.HAS_MORE_STATE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public boolean isRollbackSupported(OperatePipeTaskState operatePipeTaskState) {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public void rollbackState(ConfigNodeProcedureEnv configNodeProcedureEnv, OperatePipeTaskState operatePipeTaskState) throws IOException, InterruptedException, ProcedureException {
        if (this.pipeTaskInfo == null) {
            LOGGER.warn("ProcedureId {}: Pipe lock is not acquired, rollbackState({})'s execution will be skipped.", Long.valueOf(getProcId()), operatePipeTaskState);
            return;
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$confignode$procedure$state$pipe$task$OperatePipeTaskState[operatePipeTaskState.ordinal()]) {
            case RETRY_THRESHOLD /* 1 */:
                try {
                    rollbackFromValidateTask(configNodeProcedureEnv);
                    return;
                } catch (Exception e) {
                    LOGGER.warn("ProcedureId {}: Failed to rollback from validate task.", Long.valueOf(getProcId()), e);
                    return;
                }
            case 2:
                try {
                    rollbackFromCalculateInfoForTask(configNodeProcedureEnv);
                    return;
                } catch (Exception e2) {
                    LOGGER.warn("ProcedureId {}: Failed to rollback from calculate info for task.", Long.valueOf(getProcId()), e2);
                    return;
                }
            case 3:
                try {
                    if (!this.isRollbackFromOperateOnDataNodesSuccessful) {
                        rollbackFromWriteConfigNodeConsensus(configNodeProcedureEnv);
                    }
                    return;
                } catch (Exception e3) {
                    LOGGER.warn("ProcedureId {}: Failed to rollback from write config node consensus.", Long.valueOf(getProcId()), e3);
                    return;
                }
            case 4:
                try {
                    rollbackFromWriteConfigNodeConsensus(configNodeProcedureEnv);
                    rollbackFromOperateOnDataNodes(configNodeProcedureEnv);
                    this.isRollbackFromOperateOnDataNodesSuccessful = true;
                    return;
                } catch (Exception e4) {
                    LOGGER.warn("ProcedureId {}: Failed to rollback from operate on data nodes.", Long.valueOf(getProcId()), e4);
                    return;
                }
            default:
                LOGGER.error("Unsupported roll back STATE [{}]", operatePipeTaskState);
                return;
        }
    }

    public abstract void rollbackFromValidateTask(ConfigNodeProcedureEnv configNodeProcedureEnv);

    public abstract void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv configNodeProcedureEnv);

    public abstract void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv configNodeProcedureEnv);

    public abstract void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv configNodeProcedureEnv) throws IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public OperatePipeTaskState getState(int i) {
        return OperatePipeTaskState.values()[i];
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public int getStateId(OperatePipeTaskState operatePipeTaskState) {
        return operatePipeTaskState.ordinal();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public OperatePipeTaskState getInitialState() {
        return OperatePipeTaskState.VALIDATE_TASK;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Integer, TPushPipeMetaResp> pushPipeMetaToDataNodes(ConfigNodeProcedureEnv configNodeProcedureEnv) throws IOException {
        ArrayList arrayList = new ArrayList();
        Iterator<PipeMeta> it = this.pipeTaskInfo.get().getPipeMetaList().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().serialize());
        }
        return configNodeProcedureEnv.pushAllPipeMetaToDataNodes(arrayList);
    }

    public static Map<Integer, TPushPipeMetaResp> pushPipeMetaToDataNodes(ConfigNodeProcedureEnv configNodeProcedureEnv, AtomicReference<PipeTaskInfo> atomicReference) throws IOException {
        ArrayList arrayList = new ArrayList();
        Iterator<PipeMeta> it = atomicReference.get().getPipeMetaList().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().serialize());
        }
        return configNodeProcedureEnv.pushAllPipeMetaToDataNodes(arrayList);
    }

    public static String parsePushPipeMetaExceptionForPipe(String str, Map<Integer, TPushPipeMetaResp> map) {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<Integer, TPushPipeMetaResp> entry : map.entrySet()) {
            int intValue = entry.getKey().intValue();
            TPushPipeMetaResp value = entry.getValue();
            if (value.getStatus().getCode() == TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()) {
                if (value.isSetExceptionMessages()) {
                    AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                    value.getExceptionMessages().forEach(tPushPipeMetaRespExceptionMessage -> {
                        if (str == null) {
                            atomicBoolean.set(true);
                            sb.append(String.format("PipeName: %s, Message: %s", tPushPipeMetaRespExceptionMessage.getPipeName(), tPushPipeMetaRespExceptionMessage.getMessage()));
                        } else if (str.equals(tPushPipeMetaRespExceptionMessage.getPipeName())) {
                            atomicBoolean.set(true);
                            sb.append(String.format("Message: %s", tPushPipeMetaRespExceptionMessage.getMessage()));
                        }
                    });
                    if (atomicBoolean.get()) {
                        sb.insert(0, String.format("DataNodeId: %s, ", Integer.valueOf(intValue)));
                        sb.append(". ");
                    }
                } else {
                    sb.append(String.format("DataNodeId: %s, Message: Internal error while processing pushPipeMeta on dataNodes.", Integer.valueOf(intValue)));
                }
            }
        }
        return sb.toString();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void pushPipeMetaToDataNodesIgnoreException(ConfigNodeProcedureEnv configNodeProcedureEnv) {
        try {
            pushPipeMetaToDataNodes(configNodeProcedureEnv);
        } catch (Exception e) {
            LOGGER.info("Failed to push pipe meta list to data nodes, will retry later.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Integer, TPushPipeMetaResp> pushSinglePipeMetaToDataNodes(String str, ConfigNodeProcedureEnv configNodeProcedureEnv) throws IOException {
        return configNodeProcedureEnv.pushSinglePipeMetaToDataNodes(this.pipeTaskInfo.get().getPipeMetaByPipeName(str).serialize());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Integer, TPushPipeMetaResp> dropSinglePipeOnDataNodes(String str, ConfigNodeProcedureEnv configNodeProcedureEnv) {
        return configNodeProcedureEnv.dropSinglePipeOnDataNodes(str);
    }

    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure, org.apache.iotdb.confignode.procedure.Procedure
    public void serialize(DataOutputStream dataOutputStream) throws IOException {
        super.serialize(dataOutputStream);
        ReadWriteIOUtils.write(Boolean.valueOf(this.isRollbackFromOperateOnDataNodesSuccessful), dataOutputStream);
    }

    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure, org.apache.iotdb.confignode.procedure.Procedure
    public void deserialize(ByteBuffer byteBuffer) {
        super.deserialize(byteBuffer);
        this.isRollbackFromOperateOnDataNodesSuccessful = ReadWriteIOUtils.readBool(byteBuffer);
    }
}
