package org.apache.iotdb.confignode.procedure.impl.cq;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
import org.apache.iotdb.confignode.manager.cq.CQScheduleTask;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
import org.apache.iotdb.confignode.procedure.state.cq.CreateCQState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.class */
public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CreateCQProcedure.class);
    private static final int RETRY_THRESHOLD = 5;
    private final ScheduledExecutorService executor;
    private TCreateCQReq req;
    private String md5;
    private long firstExecutionTime;
    private static final String CONSENSUS_WRITE_ERROR = "Failed in the write API executing the consensus layer due to: ";

    public CreateCQProcedure(ScheduledExecutorService scheduledExecutorService) {
        this.executor = scheduledExecutorService;
    }

    public CreateCQProcedure(TCreateCQReq tCreateCQReq, ScheduledExecutorService scheduledExecutorService) {
        this.req = tCreateCQReq;
        this.md5 = DigestUtils.md2Hex(tCreateCQReq.cqId);
        this.executor = scheduledExecutorService;
        this.firstExecutionTime = CQScheduleTask.getFirstExecutionTime(tCreateCQReq.boundaryTime, tCreateCQReq.everyInterval);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public StateMachineProcedure.Flow executeFromState(ConfigNodeProcedureEnv configNodeProcedureEnv, CreateCQState createCQState) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
        try {
            switch (createCQState) {
                case INIT:
                    addCQ(configNodeProcedureEnv);
                    return StateMachineProcedure.Flow.HAS_MORE_STATE;
                case INACTIVE:
                    new CQScheduleTask(this.req, this.firstExecutionTime, this.md5, this.executor, configNodeProcedureEnv.getConfigManager()).submitSelf();
                    setNextState((CreateCQProcedure) CreateCQState.SCHEDULED);
                    break;
                case SCHEDULED:
                    activeCQ(configNodeProcedureEnv);
                    return StateMachineProcedure.Flow.NO_MORE_STATE;
                default:
                    throw new IllegalArgumentException("Unknown CreateCQState: " + createCQState);
            }
        } catch (Exception e) {
            if (isRollbackSupported(createCQState)) {
                LOGGER.error("Fail in CreateCQProcedure", e);
                setFailure(new ProcedureException(e));
            } else {
                LOGGER.error("Retrievable error trying to create cq [{}], state [{}]", new Object[]{this.req.getCqId(), createCQState, e});
                if (getCycles() > RETRY_THRESHOLD) {
                    setFailure(new ProcedureException(String.format("Fail to create trigger [%s] at STATE [%s]", this.req.getCqId(), createCQState)));
                }
            }
        }
        return StateMachineProcedure.Flow.HAS_MORE_STATE;
    }

    private void addCQ(ConfigNodeProcedureEnv configNodeProcedureEnv) {
        TSStatus tSStatus;
        try {
            tSStatus = configNodeProcedureEnv.getConfigManager().getConsensusManager().write(new AddCQPlan(this.req, this.md5, this.firstExecutionTime));
        } catch (ConsensusException e) {
            LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
            tSStatus = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
            tSStatus.setMessage(e.getMessage());
        }
        if (tSStatus.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            LOGGER.debug("Finish init CQ {} successfully", this.req.cqId);
            setNextState((CreateCQProcedure) CreateCQState.INACTIVE);
        } else if (tSStatus.code == TSStatusCode.CQ_ALREADY_EXIST.getStatusCode()) {
            LOGGER.info("Failed to init CQ {} because such cq already exists", this.req.cqId);
            setFailure(new ProcedureException((Throwable) new IoTDBException(tSStatus.message, tSStatus.code)));
        } else {
            LOGGER.warn("Failed to init CQ {} because of unknown reasons {}", this.req.cqId, tSStatus);
            setFailure(new ProcedureException((Throwable) new IoTDBException(tSStatus.message, tSStatus.code)));
        }
    }

    private void activeCQ(ConfigNodeProcedureEnv configNodeProcedureEnv) {
        TSStatus tSStatus;
        try {
            tSStatus = configNodeProcedureEnv.getConfigManager().getConsensusManager().write(new ActiveCQPlan(this.req.cqId, this.md5));
        } catch (ConsensusException e) {
            LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
            tSStatus = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
            tSStatus.setMessage(e.getMessage());
        }
        if (tSStatus.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            LOGGER.debug("Finish Scheduling CQ {} successfully", this.req.cqId);
            return;
        }
        if (tSStatus.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) {
            LOGGER.warn("Failed to active CQ {} because of no such cq: {}", this.req.cqId, tSStatus.message);
        } else if (tSStatus.code == TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode()) {
            LOGGER.warn("Failed to active CQ {} because this cq has already been active", this.req.cqId);
        } else {
            LOGGER.warn("Failed to active CQ {} successfully because of unknown reasons {}", this.req.cqId, tSStatus);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public void rollbackState(ConfigNodeProcedureEnv configNodeProcedureEnv, CreateCQState createCQState) throws IOException, InterruptedException, ProcedureException {
        TSStatus tSStatus;
        switch (createCQState) {
            case INIT:
            case SCHEDULED:
                return;
            case INACTIVE:
                LOGGER.info("Start [INACTIVE] rollback of CQ {}", this.req.cqId);
                try {
                    tSStatus = configNodeProcedureEnv.getConfigManager().getConsensusManager().write(new DropCQPlan(this.req.cqId, this.md5));
                } catch (ConsensusException e) {
                    LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
                    tSStatus = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
                    tSStatus.setMessage(e.getMessage());
                }
                if (tSStatus.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                    LOGGER.info("Finish [INACTIVE] rollback of CQ {} successfully", this.req.cqId);
                    return;
                } else if (tSStatus.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) {
                    LOGGER.warn("Failed to do [INACTIVE] rollback of CQ {} because of no such cq: {}", this.req.cqId, tSStatus.message);
                    return;
                } else {
                    LOGGER.warn("Failed to do [INACTIVE] rollback of CQ {} because of unknown reasons {}", this.req.cqId, tSStatus);
                    return;
                }
            default:
                throw new IllegalArgumentException("Unknown CreateCQState: " + createCQState);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public boolean isRollbackSupported(CreateCQState createCQState) {
        return createCQState == CreateCQState.INACTIVE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public CreateCQState getState(int i) {
        return CreateCQState.values()[i];
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public int getStateId(CreateCQState createCQState) {
        return createCQState.ordinal();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure
    public CreateCQState getInitialState() {
        return CreateCQState.INIT;
    }

    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure, org.apache.iotdb.confignode.procedure.Procedure
    public void serialize(DataOutputStream dataOutputStream) throws IOException {
        dataOutputStream.writeShort(ProcedureType.CREATE_CQ_PROCEDURE.getTypeCode());
        super.serialize(dataOutputStream);
        ThriftCommonsSerDeUtils.serializeTCreateCQReq(this.req, dataOutputStream);
        ReadWriteIOUtils.write(this.md5, dataOutputStream);
        ReadWriteIOUtils.write(this.firstExecutionTime, dataOutputStream);
    }

    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure, org.apache.iotdb.confignode.procedure.Procedure
    public void deserialize(ByteBuffer byteBuffer) {
        super.deserialize(byteBuffer);
        this.req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(byteBuffer);
        this.md5 = ReadWriteIOUtils.readString(byteBuffer);
        this.firstExecutionTime = ReadWriteIOUtils.readLong(byteBuffer);
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        CreateCQProcedure createCQProcedure = (CreateCQProcedure) obj;
        return getProcId() == createCQProcedure.getProcId() && getCurrentState().equals(createCQProcedure.getCurrentState()) && getCycles() == createCQProcedure.getCycles() && this.isGeneratedByPipe == createCQProcedure.isGeneratedByPipe && this.firstExecutionTime == createCQProcedure.firstExecutionTime && Objects.equals(this.req, createCQProcedure.req) && Objects.equals(this.md5, createCQProcedure.md5);
    }

    public int hashCode() {
        return Objects.hash(Long.valueOf(getProcId()), getCurrentState(), Integer.valueOf(getCycles()), Boolean.valueOf(this.isGeneratedByPipe), this.req, this.md5, Long.valueOf(this.firstExecutionTime));
    }
}
