package org.apache.iotdb.db.pipe.consensus;

import java.util.Map;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeDispatcher;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.class */
public class ConsensusPipeDataNodeDispatcher implements ConsensusPipeDispatcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsensusPipeDataNodeDispatcher.class);
    private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();

    public void createPipe(String str, Map<String, String> map, Map<String, String> map2, Map<String, String> map3) throws Exception {
        try {
            ConfigNodeClient configNodeClient = (ConfigNodeClient) CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
            try {
                TSStatus createPipe = configNodeClient.createPipe(new TCreatePipeReq().setPipeName(str).setExtractorAttributes(map).setProcessorAttributes(map2).setConnectorAttributes(map3));
                if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != createPipe.getCode()) {
                    LOGGER.warn("Failed to create consensus pipe-{}, status: {}", str, createPipe);
                    throw new PipeException(createPipe.getMessage());
                }
                if (configNodeClient != null) {
                    configNodeClient.close();
                }
            } finally {
            }
        } catch (Exception e) {
            LOGGER.warn("Failed to create consensus pipe-{}", str);
            throw new PipeException("Failed to create consensus pipe", e);
        }
    }

    public void startPipe(String str) throws Exception {
        try {
            ConfigNodeClient configNodeClient = (ConfigNodeClient) CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
            try {
                TSStatus startPipe = configNodeClient.startPipe(str);
                if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != startPipe.getCode()) {
                    LOGGER.warn("Failed to start consensus pipe-{}, status: {}", str, startPipe);
                    throw new PipeException(startPipe.getMessage());
                }
                if (configNodeClient != null) {
                    configNodeClient.close();
                }
            } finally {
            }
        } catch (Exception e) {
            LOGGER.warn("Failed to start consensus pipe-{}", str);
            throw new PipeException("Failed to start consensus pipe", e);
        }
    }

    public void stopPipe(String str) throws Exception {
        try {
            ConfigNodeClient configNodeClient = (ConfigNodeClient) CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
            try {
                TSStatus stopPipe = configNodeClient.stopPipe(str);
                if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != stopPipe.getCode()) {
                    LOGGER.warn("Failed to stop consensus pipe-{}, status: {}", str, stopPipe);
                    throw new PipeException(stopPipe.getMessage());
                }
                if (configNodeClient != null) {
                    configNodeClient.close();
                }
            } finally {
            }
        } catch (Exception e) {
            LOGGER.warn("Failed to stop consensus pipe-{}", str);
            throw new PipeException("Failed to stop consensus pipe", e);
        }
    }

    public void dropPipe(ConsensusPipeName consensusPipeName) throws Exception {
        try {
            ConfigNodeClient configNodeClient = (ConfigNodeClient) CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
            try {
                TSStatus dropPipe = configNodeClient.dropPipe(consensusPipeName.toString());
                if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != dropPipe.getCode()) {
                    LOGGER.warn("Failed to drop consensus pipe-{}, status: {}", consensusPipeName, dropPipe);
                    throw new PipeException(dropPipe.getMessage());
                }
                if (configNodeClient != null) {
                    configNodeClient.close();
                }
                PipeDataNodeAgent.receiver().pipeConsensus().handleDropPipeConsensusTask(consensusPipeName);
            } finally {
            }
        } catch (Exception e) {
            LOGGER.warn("Failed to drop consensus pipe-{}", consensusPipeName);
            throw new PipeException("Failed to drop consensus pipe", e);
        }
    }
}
