package org.apache.iotdb.db.pipe.connector.protocol.thrift.async;

import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTransferBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReqV2;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletBatchEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletInsertNodeEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletRawEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTrackableHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TreeModel
@TableModel
/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.class */
public class IoTDBDataRegionAsyncConnector extends IoTDBConnector {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionAsyncConnector.class);
    private static final String THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT = "Failed to borrow client from client pool when sending to receiver.";
    private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT = "Exception occurred while sending to receiver %s:%s.";
    private IoTDBDataNodeAsyncClientManager clientManager;
    private PipeTransferBatchReqBuilder tabletBatchBuilder;
    private final IoTDBDataRegionSyncConnector syncConnector = new IoTDBDataRegionSyncConnector();
    private final BlockingQueue<Event> retryEventQueue = new LinkedBlockingQueue();
    private final PipeDataRegionEventCounter retryEventQueueEventCounter = new PipeDataRegionEventCounter();
    private final int forcedRetryTsFileEventQueueSizeThreshold = PipeConfig.getInstance().getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold();
    private final int forcedRetryTabletEventQueueSizeThreshold = PipeConfig.getInstance().getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold();
    private final int forcedRetryTotalEventQueueSizeThreshold = PipeConfig.getInstance().getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold();
    private final long maxRetryExecutionTimeMsPerCall = PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers = new ConcurrentHashMap();

    public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
        super.validate(pipeParameterValidator);
        this.syncConnector.validate(pipeParameterValidator);
        PipeParameters parameters = pipeParameterValidator.getParameters();
        pipeParameterValidator.validate(objArr -> {
            return (((Boolean) objArr[0]).booleanValue() || ((Boolean) objArr[1]).booleanValue() || ((Boolean) objArr[2]).booleanValue()) ? false : true;
        }, "Only 'iotdb-thrift-ssl-sink' supports SSL transmission currently.", new Object[]{Boolean.valueOf(parameters.getBooleanOrDefault("sink.ssl.enable", false)), Boolean.valueOf(parameters.hasAttribute("sink.ssl.trust-store-path")), Boolean.valueOf(parameters.hasAttribute("sink.ssl.trust-store-pwd"))});
    }

    public void customize(PipeParameters pipeParameters, PipeConnectorRuntimeConfiguration pipeConnectorRuntimeConfiguration) throws Exception {
        super.customize(pipeParameters, pipeConnectorRuntimeConfiguration);
        this.syncConnector.customize(pipeParameters, pipeConnectorRuntimeConfiguration);
        this.clientManager = new IoTDBDataNodeAsyncClientManager(this.nodeUrls, pipeParameters.getBooleanOrDefault(Arrays.asList("sink.leader-cache.enable", "connector.leader-cache.enable"), true), this.loadBalanceStrategy, this.username, this.password, this.shouldReceiverConvertOnTypeMismatch, this.loadTsFileStrategy, this.loadTsFileValidation, this.shouldMarkAsPipeRequest);
        if (this.isTabletBatchModeEnabled) {
            this.tabletBatchBuilder = new PipeTransferBatchReqBuilder(pipeParameters);
        }
    }

    public synchronized void handshake() throws Exception {
        this.syncConnector.handshake();
    }

    public void heartbeat() throws Exception {
        this.syncConnector.heartbeat();
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        transferQueuedEventsIfNecessary(false);
        if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
            LOGGER.warn("IoTDBThriftAsyncConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. Current event: {}.", tabletInsertionEvent);
        } else if (this.isTabletBatchModeEnabled) {
            transferInBatchWithoutCheck(this.tabletBatchBuilder.onEvent(tabletInsertionEvent));
        } else {
            transferInEventWithoutCheck(tabletInsertionEvent);
        }
    }

    private void transferInBatchWithoutCheck(Pair<TEndPoint, PipeTabletEventBatch> pair) throws IOException, WriteProcessException, InterruptedException {
        if (Objects.isNull(pair)) {
            return;
        }
        PipeTabletEventBatch pipeTabletEventBatch = (PipeTabletEventBatch) pair.getRight();
        if (pipeTabletEventBatch instanceof PipeTabletEventPlainBatch) {
            transfer((TEndPoint) pair.getLeft(), new PipeTransferTabletBatchEventHandler((PipeTabletEventPlainBatch) pipeTabletEventBatch, this));
        } else if (pipeTabletEventBatch instanceof PipeTabletEventTsFileBatch) {
            PipeTabletEventTsFileBatch pipeTabletEventTsFileBatch = (PipeTabletEventTsFileBatch) pipeTabletEventBatch;
            List<Pair<String, File>> sealTsFiles = pipeTabletEventTsFileBatch.sealTsFiles();
            Map<Pair<String, Long>, Double> deepCopyPipe2WeightMap = pipeTabletEventTsFileBatch.deepCopyPipe2WeightMap();
            List<EnrichedEvent> deepCopyEvents = pipeTabletEventTsFileBatch.deepCopyEvents();
            AtomicInteger atomicInteger = new AtomicInteger(sealTsFiles.size());
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            try {
                for (Pair<String, File> pair2 : sealTsFiles) {
                    transfer(new PipeTransferTsFileHandler(this, deepCopyPipe2WeightMap, deepCopyEvents, atomicInteger, atomicBoolean, (File) pair2.right, null, false, (String) pair2.left));
                }
            } catch (Throwable th) {
                LOGGER.warn("Failed to transfer tsfile batch ({}).", sealTsFiles, th);
                if (atomicBoolean.compareAndSet(false, true)) {
                    addFailureEventsToRetryQueue(deepCopyEvents);
                }
            }
        } else {
            LOGGER.warn("Unsupported batch type {} when transferring tablet insertion event.", pipeTabletEventBatch.getClass());
        }
        ((PipeTabletEventBatch) pair.getRight()).onSuccess();
    }

    private boolean transferInEventWithoutCheck(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)) {
            PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent = (PipeRawTabletInsertionEvent) tabletInsertionEvent;
            if (!pipeRawTabletInsertionEvent.increaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName())) {
                return false;
            }
            transfer(pipeRawTabletInsertionEvent.getDeviceId(), new PipeTransferTabletRawEventHandler(pipeRawTabletInsertionEvent, compressIfNeeded(PipeTransferTabletRawReqV2.toTPipeTransferReq(pipeRawTabletInsertionEvent.convertToTablet(), pipeRawTabletInsertionEvent.isAligned(), pipeRawTabletInsertionEvent.isTableModelEvent() ? pipeRawTabletInsertionEvent.getTableModelDatabaseName() : null)), this));
            return true;
        }
        PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent = (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
        if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName())) {
            return false;
        }
        InsertNode insertNodeViaCacheIfPossible = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
        String tableModelDatabaseName = pipeInsertNodeTabletInsertionEvent.isTableModelEvent() ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() : null;
        transfer(pipeInsertNodeTabletInsertionEvent.getDeviceId(), new PipeTransferTabletInsertNodeEventHandler(pipeInsertNodeTabletInsertionEvent, compressIfNeeded(Objects.isNull(insertNodeViaCacheIfPossible) ? PipeTransferTabletBinaryReqV2.toTPipeTransferReq(pipeInsertNodeTabletInsertionEvent.getByteBuffer(), tableModelDatabaseName) : PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(insertNodeViaCacheIfPossible, tableModelDatabaseName)), this));
        return true;
    }

    private void transfer(TEndPoint tEndPoint, PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler) {
        AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient = null;
        try {
            asyncPipeDataTransferServiceClient = this.clientManager.borrowClient(tEndPoint);
            pipeTransferTabletBatchEventHandler.transfer(asyncPipeDataTransferServiceClient);
        } catch (Exception e) {
            logOnClientException(asyncPipeDataTransferServiceClient, e);
            pipeTransferTabletBatchEventHandler.onError(e);
        }
    }

    private void transfer(String str, PipeTransferTabletInsertNodeEventHandler pipeTransferTabletInsertNodeEventHandler) {
        AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient = null;
        try {
            asyncPipeDataTransferServiceClient = this.clientManager.borrowClient(str);
            pipeTransferTabletInsertNodeEventHandler.transfer(asyncPipeDataTransferServiceClient);
        } catch (Exception e) {
            logOnClientException(asyncPipeDataTransferServiceClient, e);
            pipeTransferTabletInsertNodeEventHandler.onError(e);
        }
    }

    private void transfer(String str, PipeTransferTabletRawEventHandler pipeTransferTabletRawEventHandler) {
        AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient = null;
        try {
            asyncPipeDataTransferServiceClient = this.clientManager.borrowClient(str);
            pipeTransferTabletRawEventHandler.transfer(asyncPipeDataTransferServiceClient);
        } catch (Exception e) {
            logOnClientException(asyncPipeDataTransferServiceClient, e);
            pipeTransferTabletRawEventHandler.onError(e);
        }
    }

    public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        transferQueuedEventsIfNecessary(false);
        transferBatchedEventsIfNecessary();
        if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
            transferWithoutCheck(tsFileInsertionEvent);
        } else {
            LOGGER.warn("IoTDBThriftAsyncConnector only support PipeTsFileInsertionEvent. Current event: {}.", tsFileInsertionEvent);
        }
    }

    private boolean transferWithoutCheck(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        PipeTsFileInsertionEvent pipeTsFileInsertionEvent = (PipeTsFileInsertionEvent) tsFileInsertionEvent;
        if (!pipeTsFileInsertionEvent.increaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName())) {
            return false;
        }
        try {
            if (!pipeTsFileInsertionEvent.getTsFile().exists()) {
                throw new FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
            }
            transfer(new PipeTransferTsFileHandler(this, Collections.singletonMap(new Pair(pipeTsFileInsertionEvent.getPipeName(), Long.valueOf(pipeTsFileInsertionEvent.getCreationTime())), Double.valueOf(1.0d)), Collections.singletonList(pipeTsFileInsertionEvent), new AtomicInteger(1), new AtomicBoolean(false), pipeTsFileInsertionEvent.getTsFile(), pipeTsFileInsertionEvent.getModFile(), pipeTsFileInsertionEvent.isWithMod() && this.clientManager.supportModsIfIsDataNodeReceiver(), pipeTsFileInsertionEvent.isTableModelEvent() ? pipeTsFileInsertionEvent.getTableModelDatabaseName() : null));
            return true;
        } catch (Exception e) {
            pipeTsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), false);
            throw e;
        }
    }

    private void transfer(PipeTransferTsFileHandler pipeTransferTsFileHandler) {
        AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient = null;
        try {
            asyncPipeDataTransferServiceClient = this.clientManager.borrowClient();
            pipeTransferTsFileHandler.transfer(this.clientManager, asyncPipeDataTransferServiceClient);
        } catch (Exception e) {
            logOnClientException(asyncPipeDataTransferServiceClient, e);
            pipeTransferTsFileHandler.onError(e);
        }
    }

    public void transfer(Event event) throws Exception {
        transferQueuedEventsIfNecessary(true);
        transferBatchedEventsIfNecessary();
        if ((event instanceof PipeHeartbeatEvent) || (event instanceof PipeDeleteDataNodeEvent) || (event instanceof PipeTerminateEvent)) {
            this.syncConnector.transfer(event);
        } else {
            LOGGER.warn("IoTDBThriftAsyncConnector does not support transferring generic event: {}.", event);
        }
    }

    private void transferBatchedEventsIfNecessary() throws IOException, WriteProcessException, InterruptedException {
        if (!this.isTabletBatchModeEnabled || this.tabletBatchBuilder.isEmpty()) {
            return;
        }
        Iterator<Pair<TEndPoint, PipeTabletEventBatch>> it = this.tabletBatchBuilder.getAllNonEmptyBatches().iterator();
        while (it.hasNext()) {
            transferInBatchWithoutCheck(it.next());
        }
    }

    public void updateLeaderCache(String str, TEndPoint tEndPoint) {
        this.clientManager.updateLeaderCache(str, tEndPoint);
    }

    private void logOnClientException(AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient, Exception exc) {
        if (asyncPipeDataTransferServiceClient == null) {
            LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT);
        } else {
            asyncPipeDataTransferServiceClient.resetMethodStateIfStopped();
            LOGGER.warn(String.format(THRIFT_ERROR_FORMATTER_WITH_ENDPOINT, asyncPipeDataTransferServiceClient.getIp(), Integer.valueOf(asyncPipeDataTransferServiceClient.getPort())), exc);
        }
    }

    private void transferQueuedEventsIfNecessary(boolean z) {
        if (this.retryEventQueue.isEmpty()) {
            return;
        }
        if (z || this.retryEventQueueEventCounter.getTabletInsertionEventCount() >= this.forcedRetryTabletEventQueueSizeThreshold || this.retryEventQueueEventCounter.getTsFileInsertionEventCount() >= this.forcedRetryTsFileEventQueueSizeThreshold || this.retryEventQueue.size() >= this.forcedRetryTotalEventQueueSizeThreshold) {
            long currentTimeMillis = System.currentTimeMillis();
            int size = this.retryEventQueue.size();
            while (!this.retryEventQueue.isEmpty()) {
                synchronized (this) {
                    if (this.isClosed.get()) {
                        return;
                    }
                    if (this.retryEventQueue.isEmpty()) {
                        return;
                    }
                    Event peek = this.retryEventQueue.peek();
                    if (peek instanceof PipeInsertNodeTabletInsertionEvent) {
                        retryTransfer((PipeInsertNodeTabletInsertionEvent) peek);
                    } else if (peek instanceof PipeRawTabletInsertionEvent) {
                        retryTransfer((PipeRawTabletInsertionEvent) peek);
                    } else if (peek instanceof PipeTsFileInsertionEvent) {
                        retryTransfer((PipeTsFileInsertionEvent) peek);
                    } else {
                        LOGGER.warn("IoTDBThriftAsyncConnector does not support transfer generic event: {}.", peek);
                    }
                    Event poll = this.retryEventQueue.poll();
                    this.retryEventQueueEventCounter.decreaseEventCount(poll);
                    if (poll != peek) {
                        LOGGER.error("The event polled from the queue is not the same as the event peeked from the queue. Peeked event: {}, polled event: {}.", peek, poll);
                    }
                    if (poll != null && LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Polled event {} from retry queue.", poll);
                    }
                    if (System.currentTimeMillis() - currentTimeMillis > this.maxRetryExecutionTimeMsPerCall) {
                        if (this.retryEventQueueEventCounter.getTabletInsertionEventCount() < this.forcedRetryTabletEventQueueSizeThreshold && this.retryEventQueueEventCounter.getTsFileInsertionEventCount() < this.forcedRetryTsFileEventQueueSizeThreshold && this.retryEventQueue.size() < this.forcedRetryTotalEventQueueSizeThreshold) {
                            return;
                        }
                        if (size <= this.retryEventQueue.size()) {
                            throw new PipeException("Failed to retry transferring events in the retry queue. Remaining events: " + this.retryEventQueue.size() + " (tablet events: " + this.retryEventQueueEventCounter.getTabletInsertionEventCount() + ", tsfile events: " + this.retryEventQueueEventCounter.getTsFileInsertionEventCount() + ").");
                        }
                    }
                }
            }
        }
    }

    private void retryTransfer(TabletInsertionEvent tabletInsertionEvent) {
        if (this.isTabletBatchModeEnabled) {
            try {
                transferInBatchWithoutCheck(this.tabletBatchBuilder.onEvent(tabletInsertionEvent));
                if (tabletInsertionEvent instanceof EnrichedEvent) {
                    ((EnrichedEvent) tabletInsertionEvent).decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), false);
                }
                return;
            } catch (Exception e) {
                addFailureEventToRetryQueue(tabletInsertionEvent);
                return;
            }
        }
        try {
            if (!transferInEventWithoutCheck(tabletInsertionEvent)) {
                addFailureEventToRetryQueue(tabletInsertionEvent);
            } else if (tabletInsertionEvent instanceof EnrichedEvent) {
                ((EnrichedEvent) tabletInsertionEvent).decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), false);
            }
        } catch (Exception e2) {
            if (tabletInsertionEvent instanceof EnrichedEvent) {
                ((EnrichedEvent) tabletInsertionEvent).decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), false);
            }
            addFailureEventToRetryQueue(tabletInsertionEvent);
        }
    }

    private void retryTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) {
        try {
            if (transferWithoutCheck(pipeTsFileInsertionEvent)) {
                pipeTsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), false);
            } else {
                addFailureEventToRetryQueue(pipeTsFileInsertionEvent);
            }
        } catch (Exception e) {
            pipeTsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), false);
            addFailureEventToRetryQueue(pipeTsFileInsertionEvent);
        }
    }

    public void addFailureEventToRetryQueue(Event event) {
        if ((event instanceof EnrichedEvent) && ((EnrichedEvent) event).isReleased()) {
            return;
        }
        if (this.isClosed.get()) {
            if (event instanceof EnrichedEvent) {
                ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
                return;
            }
            return;
        }
        this.retryEventQueue.offer(event);
        this.retryEventQueueEventCounter.increaseEventCount(event);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Added event {} to retry queue.", event);
        }
        if (this.isClosed.get() && (event instanceof EnrichedEvent)) {
            ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
        }
    }

    public void addFailureEventsToRetryQueue(Iterable<EnrichedEvent> iterable) {
        iterable.forEach((v1) -> {
            addFailureEventToRetryQueue(v1);
        });
    }

    public synchronized void discardEventsOfPipe(String str, int i) {
        if (this.isTabletBatchModeEnabled) {
            this.tabletBatchBuilder.discardEventsOfPipe(str, i);
        }
        this.retryEventQueue.removeIf(event -> {
            if (!(event instanceof EnrichedEvent) || !str.equals(((EnrichedEvent) event).getPipeName()) || i != ((EnrichedEvent) event).getRegionId()) {
                return false;
            }
            ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
            this.retryEventQueueEventCounter.decreaseEventCount(event);
            return true;
        });
    }

    public synchronized void close() {
        this.isClosed.set(true);
        this.syncConnector.close();
        if (this.tabletBatchBuilder != null) {
            this.tabletBatchBuilder.close();
        }
        if (hasPendingHandlers()) {
            ImmutableSet.copyOf(this.pendingHandlers.keySet()).forEach(pipeTransferTrackableHandler -> {
                pipeTransferTrackableHandler.clearEventsReferenceCount();
                eliminateHandler(pipeTransferTrackableHandler);
            });
        }
        try {
            if (this.clientManager != null) {
                this.clientManager.close();
            }
        } catch (Exception e) {
            LOGGER.warn("Failed to close client manager.", e);
        }
        clearRetryEventsReferenceCount();
        super.close();
    }

    public synchronized void clearRetryEventsReferenceCount() {
        while (!this.retryEventQueue.isEmpty()) {
            EnrichedEvent enrichedEvent = (Event) this.retryEventQueue.poll();
            this.retryEventQueueEventCounter.decreaseEventCount(enrichedEvent);
            if (enrichedEvent instanceof EnrichedEvent) {
                enrichedEvent.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
            }
        }
    }

    public int getRetryEventQueueSize() {
        return this.retryEventQueue.size();
    }

    public boolean isClosed() {
        return this.isClosed.get();
    }

    public void trackHandler(PipeTransferTrackableHandler pipeTransferTrackableHandler) {
        this.pendingHandlers.put(pipeTransferTrackableHandler, pipeTransferTrackableHandler);
    }

    public void eliminateHandler(PipeTransferTrackableHandler pipeTransferTrackableHandler) {
        pipeTransferTrackableHandler.close();
        this.pendingHandlers.remove(pipeTransferTrackableHandler);
    }

    public boolean hasPendingHandlers() {
        return !this.pendingHandlers.isEmpty();
    }
}
