package org.apache.iotdb.db.pipe.receiver.protocol.pipeconsensus;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.function.TriFunction;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.request.PipeConsensusRequestVersion;
import org.apache.iotdb.consensus.pipe.PipeConsensus;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeReceiver;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.class */
public class PipeConsensusReceiverAgent implements ConsensusPipeReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusReceiverAgent.class);
    private static final Map<Byte, TriFunction<PipeConsensus, ConsensusGroupId, ConsensusPipeName, PipeConsensusReceiver>> RECEIVER_CONSTRUCTORS = new HashMap();
    private static final long WAIT_INITIALIZE_RECEIVER_INTERVAL_IN_MS = 100;
    private final int thisNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
    private final Map<ConsensusGroupId, Map<ConsensusPipeName, AtomicReference<PipeConsensusReceiver>>> replicaReceiverMap = new ConcurrentHashMap();
    private PipeConsensus pipeConsensus;

    public PipeConsensusReceiverAgent() {
        RECEIVER_CONSTRUCTORS.put(Byte.valueOf(PipeConsensusRequestVersion.VERSION_1.getVersion()), PipeConsensusReceiver::new);
    }

    public void initConsensusInRuntime() {
        PipeConsensus dataRegionConsensusImpl = DataRegionConsensusImpl.getInstance();
        if (dataRegionConsensusImpl instanceof PipeConsensus) {
            this.pipeConsensus = dataRegionConsensusImpl;
        } else {
            this.pipeConsensus = null;
        }
    }

    public TPipeConsensusTransferResp receive(TPipeConsensusTransferReq tPipeConsensusTransferReq) {
        byte version = tPipeConsensusTransferReq.getVersion();
        if (RECEIVER_CONSTRUCTORS.containsKey(Byte.valueOf(version))) {
            return getReceiver(ConsensusGroupId.Factory.createFromTConsensusGroupId(tPipeConsensusTransferReq.getConsensusGroupId()), tPipeConsensusTransferReq.getDataNodeId(), version).receive(tPipeConsensusTransferReq);
        }
        TSStatus status = RpcUtils.getStatus(TSStatusCode.PIPE_CONSENSUS_VERSION_ERROR, String.format("Unknown PipeConsensusRequestVersion %s.", Byte.valueOf(version)));
        LOGGER.warn("PipeConsensus: Unknown PipeConsensusRequestVersion, response status = {}.", status);
        return new TPipeConsensusTransferResp(status);
    }

    private PipeConsensusReceiver getReceiver(ConsensusGroupId consensusGroupId, int i, byte b) {
        Map<ConsensusPipeName, AtomicReference<PipeConsensusReceiver>> computeIfAbsent = this.replicaReceiverMap.computeIfAbsent(consensusGroupId, consensusGroupId2 -> {
            return new ConcurrentHashMap();
        });
        ConsensusPipeName consensusPipeName = new ConsensusPipeName(consensusGroupId, i, this.thisNodeId);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicReference<PipeConsensusReceiver> computeIfAbsent2 = computeIfAbsent.computeIfAbsent(consensusPipeName, consensusPipeName2 -> {
            atomicBoolean.set(true);
            return new AtomicReference(null);
        });
        if (computeIfAbsent2.get() == null) {
            return internalSetAndGetReceiver(consensusGroupId, consensusPipeName, b, atomicBoolean);
        }
        byte version = computeIfAbsent2.get().getVersion().getVersion();
        if (version == b) {
            return computeIfAbsent2.get();
        }
        LOGGER.warn("The pipeConsensus request version {} is different from the sender request version {}, the receiver will be reset to the sender request version.", Byte.valueOf(version), Byte.valueOf(b));
        computeIfAbsent2.set(null);
        return internalSetAndGetReceiver(consensusGroupId, consensusPipeName, b, atomicBoolean);
    }

    private PipeConsensusReceiver internalSetAndGetReceiver(ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName, byte b, AtomicBoolean atomicBoolean) {
        AtomicReference<PipeConsensusReceiver> atomicReference = this.replicaReceiverMap.get(consensusGroupId).get(consensusPipeName);
        if (!atomicBoolean.get()) {
            waitUntilReceiverGetInitiated(atomicReference);
        } else {
            if (!RECEIVER_CONSTRUCTORS.containsKey(Byte.valueOf(b))) {
                throw new UnsupportedOperationException(String.format("Unsupported pipeConsensus request version %d", Byte.valueOf(b)));
            }
            atomicReference.set((PipeConsensusReceiver) RECEIVER_CONSTRUCTORS.get(Byte.valueOf(b)).apply(this.pipeConsensus, consensusGroupId, consensusPipeName));
        }
        return atomicReference.get();
    }

    private void waitUntilReceiverGetInitiated(AtomicReference<PipeConsensusReceiver> atomicReference) {
        while (atomicReference.get() == null) {
            try {
                Thread.sleep(WAIT_INITIALIZE_RECEIVER_INTERVAL_IN_MS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOGGER.warn("PipeConsensusReceiver thread is interrupted when waiting for receiver get initiated, may because system exit.", e);
                return;
            }
        }
    }

    public final void handleDropPipeConsensusTask(ConsensusPipeName consensusPipeName) {
        Map<ConsensusPipeName, AtomicReference<PipeConsensusReceiver>> orDefault = this.replicaReceiverMap.getOrDefault(consensusPipeName.getConsensusGroupId(), new ConcurrentHashMap());
        AtomicReference<PipeConsensusReceiver> orDefault2 = orDefault.getOrDefault(consensusPipeName, null);
        if (orDefault2 != null) {
            orDefault2.get().handleExit();
            orDefault2.set(null);
            orDefault.remove(consensusPipeName);
        }
    }
}
