package org.apache.iotdb.confignode.procedure.impl.subscription.subscription;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.class */
public abstract class AbstractOperateSubscriptionAndPipeProcedure extends AbstractOperateSubscriptionProcedure {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractOperateSubscriptionAndPipeProcedure.class);
    protected AtomicReference<PipeTaskInfo> pipeTaskInfo;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure, org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure, org.apache.iotdb.confignode.procedure.Procedure
    public ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
        LOGGER.info("ProcedureId {} try to acquire subscription and pipe lock.", Long.valueOf(getProcId()));
        this.pipeTaskInfo = configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
        if (this.pipeTaskInfo == null) {
            LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", Long.valueOf(getProcId()));
        } else {
            LOGGER.info("ProcedureId {} acquired pipe lock.", Long.valueOf(getProcId()));
        }
        ProcedureLockState acquireLock = super.acquireLock(configNodeProcedureEnv);
        switch (acquireLock) {
            case LOCK_ACQUIRED:
                if (this.pipeTaskInfo != null) {
                    LOGGER.info("ProcedureId {}: LOCK_ACQUIRED. The following procedure should be executed with subscription and pipe lock.", Long.valueOf(getProcId()));
                    break;
                } else {
                    LOGGER.warn("ProcedureId {}: LOCK_ACQUIRED. The following procedure should not be executed without pipe lock.", Long.valueOf(getProcId()));
                    break;
                }
            case LOCK_EVENT_WAIT:
                if (this.pipeTaskInfo != null) {
                    LOGGER.info("ProcedureId {}: LOCK_EVENT_WAIT. Pipe lock will be released.", Long.valueOf(getProcId()));
                    configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
                    this.pipeTaskInfo = null;
                    break;
                } else {
                    LOGGER.warn("ProcedureId {}: LOCK_EVENT_WAIT. Without acquiring pipe lock.", Long.valueOf(getProcId()));
                    break;
                }
            default:
                if (this.pipeTaskInfo != null) {
                    LOGGER.error("ProcedureId {}: {}. Invalid lock state. Pipe lock will be released.", Long.valueOf(getProcId()), acquireLock);
                    configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
                    this.pipeTaskInfo = null;
                    break;
                } else {
                    LOGGER.error("ProcedureId {}: {}. Invalid lock state. Without acquiring pipe lock.", Long.valueOf(getProcId()), acquireLock);
                    break;
                }
        }
        return acquireLock;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure, org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure, org.apache.iotdb.confignode.procedure.Procedure
    public void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
        super.releaseLock(configNodeProcedureEnv);
        if (this.pipeTaskInfo == null) {
            LOGGER.warn("ProcedureId {} release lock. No need to release pipe lock.", Long.valueOf(getProcId()));
            return;
        }
        LOGGER.info("ProcedureId {} release lock. Pipe lock will be released.", Long.valueOf(getProcId()));
        configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
        this.pipeTaskInfo = null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Integer, TPushPipeMetaResp> pushMultiPipeMetaToDataNodes(List<String> list, ConfigNodeProcedureEnv configNodeProcedureEnv) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (String str : list) {
            PipeMeta pipeMetaByPipeName = this.pipeTaskInfo.get().getPipeMetaByPipeName(str);
            if (pipeMetaByPipeName == null) {
                LOGGER.warn("Pipe {} not found in PipeTaskInfo, can not push its meta.", str);
            } else {
                arrayList.add(AbstractOperatePipeProcedureV2.copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMetaByPipeName).serialize());
            }
        }
        return configNodeProcedureEnv.pushMultiPipeMetaToDataNodes(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Integer, TPushPipeMetaResp> dropMultiPipeOnDataNodes(List<String> list, ConfigNodeProcedureEnv configNodeProcedureEnv) {
        return configNodeProcedureEnv.dropMultiPipeOnDataNodes(list);
    }
}
