package org.apache.iotdb.db.pipe.processor.twostage.exchange.sender;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.class */
public class TwoStageAggregateSender implements AutoCloseable {
    private final String pipeName;
    private final long creationTime;
    private TEndPoint[] endPoints;
    private final Map<TEndPoint, IoTDBSyncClient> endPointIoTDBSyncClientMap = new ConcurrentHashMap();
    private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageAggregateSender.class);
    private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
    private static final AtomicLong DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME = new AtomicLong(0);
    private static final AtomicReference<Map<Integer, TEndPoint>> DATANODE_ID_2_END_POINTS = new AtomicReference<>();

    public TwoStageAggregateSender(String str, long j) {
        this.pipeName = str;
        this.creationTime = j;
    }

    public synchronized TPipeTransferResp request(long j, TPipeTransferReq tPipeTransferReq) throws TException {
        tryConstructClients(tryFetchEndPointsIfNecessary());
        TEndPoint tEndPoint = this.endPoints[((int) j) % this.endPoints.length];
        IoTDBSyncClient ioTDBSyncClient = this.endPointIoTDBSyncClientMap.get(tEndPoint);
        if (ioTDBSyncClient == null) {
            ioTDBSyncClient = reconstructIoTDBSyncClient(tEndPoint);
        }
        LOGGER.info("Sending request {} (watermark = {}) to {}", new Object[]{tPipeTransferReq, Long.valueOf(j), tEndPoint});
        try {
            return ioTDBSyncClient.pipeTransfer(tPipeTransferReq);
        } catch (Exception e) {
            LOGGER.warn("Failed to send request {} (watermark = {}) to {}", new Object[]{tPipeTransferReq, Long.valueOf(j), tEndPoint, e});
            try {
                reconstructIoTDBSyncClient(tEndPoint);
            } catch (Exception e2) {
                LOGGER.warn("Failed to reconstruct IoTDBSyncClient {} after failure to send request {} (watermark = {})", new Object[]{tEndPoint, tPipeTransferReq, Long.valueOf(j), e2});
            }
            throw e;
        }
    }

    private static boolean tryFetchEndPointsIfNecessary() {
        long currentTimeMillis = System.currentTimeMillis();
        if (DATANODE_ID_2_END_POINTS.get() != null && currentTimeMillis - DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.get() < PIPE_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs()) {
            return false;
        }
        synchronized (DATANODE_ID_2_END_POINTS) {
            if (DATANODE_ID_2_END_POINTS.get() != null && currentTimeMillis - DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.get() < PIPE_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs()) {
                return false;
            }
            HashMap hashMap = new HashMap();
            try {
                ConfigNodeClient configNodeClient = (ConfigNodeClient) ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
                try {
                    TShowDataNodesResp showDataNodes = configNodeClient.showDataNodes();
                    if (showDataNodes == null || showDataNodes.getDataNodesInfoList() == null) {
                        throw new PipeException("Failed to fetch data nodes");
                    }
                    for (TDataNodeInfo tDataNodeInfo : showDataNodes.getDataNodesInfoList()) {
                        hashMap.put(Integer.valueOf(tDataNodeInfo.getDataNodeId()), new TEndPoint(tDataNodeInfo.getRpcAddresss(), tDataNodeInfo.getRpcPort()));
                    }
                    if (configNodeClient != null) {
                        configNodeClient.close();
                    }
                    if (hashMap.isEmpty()) {
                        throw new PipeException("No data nodes' endpoints fetched");
                    }
                    DATANODE_ID_2_END_POINTS.set(hashMap);
                    DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.set(currentTimeMillis);
                    LOGGER.info("Data nodes' endpoints for two-stage aggregation: {}", DATANODE_ID_2_END_POINTS);
                    return true;
                } catch (Throwable th) {
                    if (configNodeClient != null) {
                        try {
                            configNodeClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (ClientManagerException | TException e) {
                throw new PipeException("Failed to fetch data nodes", e);
            }
        }
    }

    private void tryConstructClients(boolean z) {
        if (!Objects.nonNull(this.endPoints) || z) {
            Set<Integer> expectedDataNodeIdSet = PipeCombineHandlerManager.getInstance().getExpectedDataNodeIdSet(this.pipeName, this.creationTime);
            if (expectedDataNodeIdSet.isEmpty()) {
                throw new PipeException("No expected region id set fetched");
            }
            this.endPoints = (TEndPoint[]) DATANODE_ID_2_END_POINTS.get().entrySet().stream().filter(entry -> {
                return expectedDataNodeIdSet.contains(entry.getKey());
            }).map((v0) -> {
                return v0.getValue();
            }).toArray(i -> {
                return new TEndPoint[i];
            });
            LOGGER.info("End points for two-stage aggregation pipe (pipeName={}, creationTime={}) were updated to {}", new Object[]{this.pipeName, Long.valueOf(this.creationTime), this.endPoints});
            for (TEndPoint tEndPoint : this.endPoints) {
                if (!this.endPointIoTDBSyncClientMap.containsKey(tEndPoint)) {
                    try {
                        this.endPointIoTDBSyncClientMap.put(tEndPoint, constructIoTDBSyncClient(tEndPoint));
                    } catch (TTransportException e) {
                        LOGGER.warn("Failed to construct IoTDBSyncClient", e);
                    }
                }
            }
            Iterator it = new HashSet(this.endPointIoTDBSyncClientMap.keySet()).iterator();
            while (it.hasNext()) {
                TEndPoint tEndPoint2 = (TEndPoint) it.next();
                if (!DATANODE_ID_2_END_POINTS.get().containsValue(tEndPoint2)) {
                    try {
                        this.endPointIoTDBSyncClientMap.remove(tEndPoint2).close();
                    } catch (Exception e2) {
                        LOGGER.warn("Failed to close IoTDBSyncClient", e2);
                    }
                }
            }
        }
    }

    private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint tEndPoint) throws TTransportException {
        IoTDBSyncClient remove = this.endPointIoTDBSyncClientMap.remove(tEndPoint);
        if (remove != null) {
            try {
                remove.close();
            } catch (Exception e) {
                LOGGER.warn("Failed to close old IoTDBSyncClient", e);
            }
        }
        IoTDBSyncClient constructIoTDBSyncClient = constructIoTDBSyncClient(tEndPoint);
        this.endPointIoTDBSyncClientMap.put(tEndPoint, constructIoTDBSyncClient);
        return constructIoTDBSyncClient;
    }

    private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint tEndPoint) throws TTransportException {
        return new IoTDBSyncClient(new ThriftClientProperty.Builder().setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs()).setRpcThriftCompressionEnabled(PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled()).build(), tEndPoint.getIp(), tEndPoint.getPort(), false, (String) null, (String) null);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        Iterator<IoTDBSyncClient> it = this.endPointIoTDBSyncClientMap.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
                LOGGER.warn("Failed to close IoTDBSyncClient", e);
            }
        }
        this.endPointIoTDBSyncClientMap.clear();
        this.endPoints = null;
    }
}
