package org.apache.iotdb.confignode.manager;

import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.commons.exception.sync.PipeSinkNotExistException;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipe.PipeStatus;
import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
import org.apache.iotdb.confignode.consensus.response.pipe.PipeResp;
import org.apache.iotdb.confignode.consensus.response.pipe.PipeSinkResp;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/manager/SyncManager.class */
public class SyncManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(SyncManager.class);
    private final IManager configManager;
    private final ClusterSyncInfo clusterSyncInfo;

    public SyncManager(IManager iManager, ClusterSyncInfo clusterSyncInfo) {
        this.configManager = iManager;
        this.clusterSyncInfo = clusterSyncInfo;
    }

    public void lockSyncMetadata() {
        this.clusterSyncInfo.lockSyncMetadata();
    }

    public void unlockSyncMetadata() {
        this.clusterSyncInfo.unlockSyncMetadata();
    }

    public TSStatus createPipeSink(CreatePipeSinkPlan createPipeSinkPlan) {
        try {
            this.clusterSyncInfo.checkAddPipeSink(createPipeSinkPlan);
            return getConsensusManager().write(createPipeSinkPlan).getStatus();
        } catch (PipeSinkException e) {
            LOGGER.error(e.getMessage());
            return new TSStatus(TSStatusCode.CREATE_PIPE_SINK_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
    }

    public TSStatus dropPipeSink(DropPipeSinkPlan dropPipeSinkPlan) {
        try {
            this.clusterSyncInfo.checkDropPipeSink(dropPipeSinkPlan.getPipeSinkName());
            return getConsensusManager().write(dropPipeSinkPlan).getStatus();
        } catch (PipeSinkException e) {
            LOGGER.error(e.getMessage());
            return new TSStatus(TSStatusCode.CREATE_PIPE_SINK_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
    }

    public TGetPipeSinkResp getPipeSink(String str) {
        PipeSinkResp pipeSinkResp = (PipeSinkResp) getConsensusManager().read(new GetPipeSinkPlan(str)).getDataset();
        TGetPipeSinkResp tGetPipeSinkResp = new TGetPipeSinkResp();
        tGetPipeSinkResp.setStatus(pipeSinkResp.getStatus());
        if (pipeSinkResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            tGetPipeSinkResp.setPipeSinkInfoList((List) pipeSinkResp.getPipeSinkList().stream().map((v0) -> {
                return v0.getTPipeSinkInfo();
            }).collect(Collectors.toList()));
        }
        return tGetPipeSinkResp;
    }

    public void checkAddPipe(PipeInfo pipeInfo) throws PipeException, PipeSinkNotExistException {
        this.clusterSyncInfo.checkAddPipe(pipeInfo);
    }

    public TSStatus preCreatePipe(PipeInfo pipeInfo) {
        pipeInfo.setStatus(PipeStatus.PARTIAL_CREATE);
        return getConsensusManager().write(new PreCreatePipePlan(pipeInfo)).getStatus();
    }

    public TSStatus setPipeStatus(String str, PipeStatus pipeStatus) {
        return getConsensusManager().write(new SetPipeStatusPlan(str, pipeStatus)).getStatus();
    }

    public TSStatus dropPipe(String str) {
        return getConsensusManager().write(new DropPipePlan(str)).getStatus();
    }

    public TSStatus recordPipeMessage(String str, PipeMessage pipeMessage) {
        return getConsensusManager().write(new RecordPipeMessagePlan(str, pipeMessage)).getStatus();
    }

    public TShowPipeResp showPipe(String str) {
        PipeResp pipeResp = (PipeResp) getConsensusManager().read(new ShowPipePlan(str)).getDataset();
        TShowPipeResp tShowPipeResp = new TShowPipeResp();
        tShowPipeResp.setStatus(pipeResp.getStatus());
        if (pipeResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            tShowPipeResp.setPipeInfoList((List) pipeResp.getPipeInfoList().stream().map((v0) -> {
                return v0.getTShowPipeInfo();
            }).collect(Collectors.toList()));
        }
        return tShowPipeResp;
    }

    public PipeInfo getPipeInfo(String str) throws PipeException {
        return this.clusterSyncInfo.getPipeInfo(str);
    }

    public TGetAllPipeInfoResp getAllPipeInfo() {
        try {
            lockSyncMetadata();
            TGetAllPipeInfoResp tGetAllPipeInfoResp = new TGetAllPipeInfoResp();
            tGetAllPipeInfoResp.setStatus(StatusUtils.OK);
            tGetAllPipeInfoResp.setAllPipeInfo((List) this.clusterSyncInfo.getAllPipeInfos().stream().map((v0) -> {
                return v0.serializeToByteBuffer();
            }).collect(Collectors.toList()));
            return tGetAllPipeInfoResp;
        } finally {
            unlockSyncMetadata();
        }
    }

    public Map<Integer, TSStatus> operatePipeOnDataNodes(String str, SyncOperation syncOperation) {
        NodeManager nodeManager = this.configManager.getNodeManager();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        nodeManager.filterDataNodeThroughStatus(NodeStatus.Running).forEach(tDataNodeConfiguration -> {
            concurrentHashMap.put(Integer.valueOf(tDataNodeConfiguration.getLocation().getDataNodeId()), tDataNodeConfiguration.getLocation());
        });
        AsyncClientHandler<?, ?> asyncClientHandler = new AsyncClientHandler<>(DataNodeRequestType.OPERATE_PIPE, new TOperatePipeOnDataNodeReq(str, (byte) syncOperation.ordinal()), concurrentHashMap);
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(asyncClientHandler);
        return asyncClientHandler.getResponseMap();
    }

    public void operatePipeOnDataNodesForRollback(String str, long j, SyncOperation syncOperation, Set<Integer> set) {
        NodeManager nodeManager = this.configManager.getNodeManager();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        nodeManager.filterDataNodeThroughStatus(NodeStatus.Running).stream().filter(tDataNodeConfiguration -> {
            return set.contains(Integer.valueOf(tDataNodeConfiguration.getLocation().dataNodeId));
        }).forEach(tDataNodeConfiguration2 -> {
            concurrentHashMap.put(Integer.valueOf(tDataNodeConfiguration2.getLocation().getDataNodeId()), tDataNodeConfiguration2.getLocation());
        });
        TOperatePipeOnDataNodeReq createTime = new TOperatePipeOnDataNodeReq(str, (byte) syncOperation.ordinal()).setCreateTime(j);
        AsyncClientHandler<?, ?> asyncClientHandler = new AsyncClientHandler<>(DataNodeRequestType.ROLLBACK_OPERATE_PIPE, createTime, concurrentHashMap);
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(asyncClientHandler);
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Integer, ?> entry : asyncClientHandler.getResponseMap().entrySet()) {
            if (((TSStatus) entry.getValue()).getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                arrayList.add(entry.getKey());
            }
        }
        this.configManager.getRetryFailedTasksThread().retryRollbackReq(arrayList, createTime);
    }

    public Map<Integer, TSStatus> preCreatePipeOnDataNodes(PipeInfo pipeInfo) {
        NodeManager nodeManager = this.configManager.getNodeManager();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        nodeManager.filterDataNodeThroughStatus(NodeStatus.Running).forEach(tDataNodeConfiguration -> {
            concurrentHashMap.put(Integer.valueOf(tDataNodeConfiguration.getLocation().getDataNodeId()), tDataNodeConfiguration.getLocation());
        });
        AsyncClientHandler<?, ?> asyncClientHandler = new AsyncClientHandler<>(DataNodeRequestType.PRE_CREATE_PIPE, new TCreatePipeOnDataNodeReq(pipeInfo.serializeToByteBuffer()), concurrentHashMap);
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(asyncClientHandler);
        return asyncClientHandler.getResponseMap();
    }

    @Subscribe
    @AllowConcurrentEvents
    public void handleNodeStatistics(NodeStatisticsEvent nodeStatisticsEvent) {
    }

    private ConsensusManager getConsensusManager() {
        return this.configManager.getConsensusManager();
    }
}
