package org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeConnector;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.consensus.pipe.metric.PipeConsensusSyncLagManager;
import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletBatchEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletInsertNodeEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTsFileInsertionEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.builder.PipeConsensusAsyncBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusConnectorMetrics;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TreeModel
@TableModel
/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.class */
public class PipeConsensusAsyncConnector extends IoTDBConnector implements ConsensusPipeConnector {
    private static final String ENQUEUE_EXCEPTION_MSG = "Timeout: PipeConsensusConnector offers an event into transferBuffer failed, because transferBuffer is full.";
    private static final String THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT = "Failed to borrow client from client pool or exception occurred when sending to receiver.";
    private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT = "Failed to borrow client from client pool or exception occurred when sending to receiver %s:%s.";
    private PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics;
    private String consensusPipeName;
    private int consensusGroupId;
    private PipeConsensusSyncConnector retryConnector;
    private IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> asyncTransferClientManager;
    private PipeConsensusAsyncBatchReqBuilder tabletBatchBuilder;
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusAsyncConnector.class);
    private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
    private static final long PIPE_CONSENSUS_EVENT_ENQUEUE_TIMEOUT_IN_MS = IOTDB_CONFIG.getConnectionTimeoutInMS() / 6;
    private final BlockingQueue<Event> retryEventQueue = new LinkedBlockingQueue();
    private final BlockingQueue<EnrichedEvent> transferBuffer = new LinkedBlockingDeque(IOTDB_CONFIG.getIotConsensusV2PipelineSize());
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final int thisDataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
    private volatile long currentReplicateProgress = 0;
    private final Lock lock = new ReentrantLock();

    public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
        super.validate(pipeParameterValidator);
        PipeParameters parameters = pipeParameterValidator.getParameters();
        pipeParameterValidator.validate(objArr -> {
            return ((Boolean) objArr[0]).booleanValue() || ((Boolean) objArr[1]).booleanValue();
        }, String.format("One of %s, %s must be specified in consensus pipe", "connector.consensus.group-id", "connector.consensus.pipe-name"), new Object[]{Boolean.valueOf(parameters.hasAttribute("connector.consensus.group-id")), Boolean.valueOf(parameters.hasAttribute("connector.consensus.pipe-name"))});
    }

    public void customize(PipeParameters pipeParameters, PipeConnectorRuntimeConfiguration pipeConnectorRuntimeConfiguration) throws Exception {
        super.customize(pipeParameters, pipeConnectorRuntimeConfiguration);
        this.consensusGroupId = pipeParameters.getInt("connector.consensus.group-id").intValue();
        this.consensusPipeName = pipeParameters.getString("connector.consensus.pipe-name");
        this.pipeConsensusConnectorMetrics = new PipeConsensusConnectorMetrics(this);
        PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr()).addConsensusPipeConnector(new ConsensusPipeName(this.consensusPipeName), this);
        MetricService.getInstance().addMetricSet(this.pipeConsensusConnectorMetrics);
        this.retryConnector = new PipeConsensusSyncConnector(this.nodeUrls, this.consensusGroupId, this.thisDataNodeId, this.pipeConsensusConnectorMetrics);
        this.retryConnector.customize(pipeParameters, pipeConnectorRuntimeConfiguration);
        this.asyncTransferClientManager = PipeConsensusClientMgrContainer.getInstance().getGlobalAsyncClientManager();
        if (this.isTabletBatchModeEnabled) {
            this.tabletBatchBuilder = new PipeConsensusAsyncBatchReqBuilder(pipeParameters, new TConsensusGroupId(TConsensusGroupType.DataRegion, this.consensusGroupId), this.thisDataNodeId);
        }
        this.isTabletBatchModeEnabled = false;
    }

    private boolean addEvent2Buffer(EnrichedEvent enrichedEvent) {
        try {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("PipeConsensus-ConsensusGroup-{}: no.{} event-{} added to connector buffer", new Object[]{Integer.valueOf(this.consensusGroupId), Long.valueOf(enrichedEvent.getReplicateIndexForIoTV2()), enrichedEvent});
            }
            if (this.transferBuffer.contains(enrichedEvent)) {
                return true;
            }
            long nanoTime = System.nanoTime();
            boolean offer = this.transferBuffer.offer(enrichedEvent, PIPE_CONSENSUS_EVENT_ENQUEUE_TIMEOUT_IN_MS, TimeUnit.MILLISECONDS);
            this.pipeConsensusConnectorMetrics.recordConnectorEnqueueTimer(System.nanoTime() - nanoTime);
            if (offer) {
                enrichedEvent.increaseReferenceCount(PipeConsensusAsyncConnector.class.getName());
            }
            if (this.isClosed.get()) {
                enrichedEvent.clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
            }
            return offer;
        } catch (InterruptedException e) {
            LOGGER.info("PipeConsensusConnector transferBuffer queue offer is interrupted.", e);
            Thread.currentThread().interrupt();
            return false;
        }
    }

    public void removeEventFromBuffer(EnrichedEvent enrichedEvent) {
        try {
            this.lock.lockInterruptibly();
            try {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("PipeConsensus-ConsensusGroup-{}: one event-{} successfully received by the follower, will be removed from queue, queue size = {}, limit size = {}", new Object[]{Integer.valueOf(this.consensusGroupId), enrichedEvent, Integer.valueOf(this.transferBuffer.size()), Integer.valueOf(IOTDB_CONFIG.getIotConsensusV2PipelineSize())});
                }
                if (this.transferBuffer.isEmpty()) {
                    LOGGER.info("PipeConsensus-ConsensusGroup-{}: try to remove event-{} after pipeConsensusAsyncConnector being closed. Ignore it.", Integer.valueOf(this.consensusGroupId), enrichedEvent);
                    this.lock.unlock();
                    return;
                }
                Iterator it = this.transferBuffer.iterator();
                for (EnrichedEvent enrichedEvent2 = (EnrichedEvent) it.next(); !enrichedEvent2.equalsInPipeConsensus(enrichedEvent) && it.hasNext(); enrichedEvent2 = (EnrichedEvent) it.next()) {
                }
                it.remove();
                this.currentReplicateProgress = Math.max(this.currentReplicateProgress, enrichedEvent.getReplicateIndexForIoTV2());
                enrichedEvent.decreaseReferenceCount(PipeConsensusAsyncConnector.class.getName(), true);
                this.lock.unlock();
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        } catch (InterruptedException e) {
            LOGGER.info("PipeConsensusAsyncConnector try to lock interrupted. Will exit directly", e);
            Thread.currentThread().interrupt();
        }
    }

    public void handshake() throws Exception {
    }

    public void heartbeat() throws Exception {
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        syncTransferQueuedEventsIfNecessary();
        if (!addEvent2Buffer((EnrichedEvent) tabletInsertionEvent)) {
            throw new PipeRuntimeConnectorRetryTimesConfigurableException(ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);
        }
        if (this.isTabletBatchModeEnabled) {
            if (this.tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
                transfer(new PipeConsensusTabletBatchEventHandler(this.tabletBatchBuilder, this, this.pipeConsensusConnectorMetrics));
                this.tabletBatchBuilder.onSuccess();
                return;
            }
            return;
        }
        TConsensusGroupId tConsensusGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, this.consensusGroupId);
        PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent = (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
        TCommitId tCommitId = new TCommitId(pipeInsertNodeTabletInsertionEvent.getReplicateIndexForIoTV2(), pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(), pipeInsertNodeTabletInsertionEvent.getRebootTimes());
        if (pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(PipeConsensusAsyncConnector.class.getName())) {
            InsertNode insertNodeViaCacheIfPossible = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
            ProgressIndex progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex();
            transfer(new PipeConsensusTabletInsertNodeEventHandler(pipeInsertNodeTabletInsertionEvent, Objects.isNull(insertNodeViaCacheIfPossible) ? PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(pipeInsertNodeTabletInsertionEvent.getByteBuffer(), tCommitId, tConsensusGroupId, progressIndex, this.thisDataNodeId) : PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(insertNodeViaCacheIfPossible, tCommitId, tConsensusGroupId, progressIndex, this.thisDataNodeId), this, this.pipeConsensusConnectorMetrics));
        }
    }

    private void transfer(PipeConsensusTabletBatchEventHandler pipeConsensusTabletBatchEventHandler) {
        AsyncPipeConsensusServiceClient asyncPipeConsensusServiceClient = null;
        try {
            asyncPipeConsensusServiceClient = (AsyncPipeConsensusServiceClient) this.asyncTransferClientManager.borrowClient(getFollowerUrl());
            pipeConsensusTabletBatchEventHandler.transfer(asyncPipeConsensusServiceClient);
        } catch (Exception e) {
            logOnClientException(asyncPipeConsensusServiceClient, e);
            pipeConsensusTabletBatchEventHandler.onError(e);
        }
    }

    private void transfer(PipeConsensusTabletInsertNodeEventHandler pipeConsensusTabletInsertNodeEventHandler) {
        AsyncPipeConsensusServiceClient asyncPipeConsensusServiceClient = null;
        try {
            asyncPipeConsensusServiceClient = (AsyncPipeConsensusServiceClient) this.asyncTransferClientManager.borrowClient(getFollowerUrl());
            pipeConsensusTabletInsertNodeEventHandler.transfer(asyncPipeConsensusServiceClient);
        } catch (Exception e) {
            logOnClientException(asyncPipeConsensusServiceClient, e);
            pipeConsensusTabletInsertNodeEventHandler.onError(e);
        }
    }

    public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        syncTransferQueuedEventsIfNecessary();
        transferBatchedEventsIfNecessary();
        if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
            LOGGER.warn("PipeConsensusAsyncConnector only support PipeTsFileInsertionEvent. Current event: {}.", tsFileInsertionEvent);
            return;
        }
        if (!addEvent2Buffer((EnrichedEvent) tsFileInsertionEvent)) {
            throw new PipeRuntimeConnectorRetryTimesConfigurableException(ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);
        }
        PipeTsFileInsertionEvent pipeTsFileInsertionEvent = (PipeTsFileInsertionEvent) tsFileInsertionEvent;
        TCommitId tCommitId = new TCommitId(pipeTsFileInsertionEvent.getReplicateIndexForIoTV2(), pipeTsFileInsertionEvent.getCommitterKey().getRestartTimes(), pipeTsFileInsertionEvent.getRebootTimes());
        TConsensusGroupId tConsensusGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, this.consensusGroupId);
        if (pipeTsFileInsertionEvent.increaseReferenceCount(PipeConsensusAsyncConnector.class.getName())) {
            try {
                if (!pipeTsFileInsertionEvent.getTsFile().exists()) {
                    throw new FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
                }
                transfer(new PipeConsensusTsFileInsertionEventHandler(pipeTsFileInsertionEvent, this, tCommitId, tConsensusGroupId, this.consensusPipeName, this.thisDataNodeId, this.pipeConsensusConnectorMetrics));
            } catch (Exception e) {
                pipeTsFileInsertionEvent.decreaseReferenceCount(PipeConsensusAsyncConnector.class.getName(), false);
                throw e;
            }
        }
    }

    private void transfer(PipeConsensusTsFileInsertionEventHandler pipeConsensusTsFileInsertionEventHandler) {
        AsyncPipeConsensusServiceClient asyncPipeConsensusServiceClient = null;
        try {
            asyncPipeConsensusServiceClient = (AsyncPipeConsensusServiceClient) this.asyncTransferClientManager.borrowClient(getFollowerUrl());
            pipeConsensusTsFileInsertionEventHandler.transfer(asyncPipeConsensusServiceClient);
        } catch (Exception e) {
            logOnClientException(asyncPipeConsensusServiceClient, e);
            pipeConsensusTsFileInsertionEventHandler.onError(e);
        }
    }

    public void transfer(Event event) throws Exception {
        syncTransferQueuedEventsIfNecessary();
        transferBatchedEventsIfNecessary();
        if (!(event instanceof PipeDeleteDataNodeEvent)) {
            if (event instanceof PipeHeartbeatEvent) {
                return;
            }
            LOGGER.warn("PipeConsensusAsyncConnector does not support transferring generic event: {}.", event);
        } else {
            PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent = (PipeDeleteDataNodeEvent) event;
            if (!addEvent2Buffer(pipeDeleteDataNodeEvent)) {
                throw new PipeRuntimeConnectorRetryTimesConfigurableException(ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);
            }
            this.retryConnector.transfer(event);
            removeEventFromBuffer(pipeDeleteDataNodeEvent);
        }
    }

    private void transferBatchedEventsIfNecessary() throws IOException {
        if (!this.isTabletBatchModeEnabled || this.tabletBatchBuilder.isEmpty()) {
            return;
        }
        transfer(new PipeConsensusTabletBatchEventHandler(this.tabletBatchBuilder, this, this.pipeConsensusConnectorMetrics));
        this.tabletBatchBuilder.onSuccess();
    }

    private void syncTransferQueuedEventsIfNecessary() throws Exception {
        while (!this.retryEventQueue.isEmpty()) {
            try {
                this.lock.lockInterruptibly();
                try {
                    if (this.isClosed.get() || this.retryEventQueue.isEmpty()) {
                        return;
                    }
                    TabletInsertionEvent tabletInsertionEvent = (Event) this.retryEventQueue.peek();
                    if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
                        this.retryConnector.transfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
                    } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
                        this.retryConnector.transfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
                    } else if (tabletInsertionEvent instanceof PipeTsFileInsertionEvent) {
                        this.retryConnector.transfer((PipeTsFileInsertionEvent) tabletInsertionEvent);
                    } else if (LOGGER.isWarnEnabled()) {
                        LOGGER.warn("PipeConsensusAsyncConnector does not support transfer generic event: {}.", tabletInsertionEvent);
                    }
                    if (tabletInsertionEvent instanceof EnrichedEvent) {
                        ((EnrichedEvent) tabletInsertionEvent).decreaseReferenceCount(PipeConsensusAsyncConnector.class.getName(), true);
                    }
                    TabletInsertionEvent tabletInsertionEvent2 = (Event) this.retryEventQueue.poll();
                    if (tabletInsertionEvent2 != tabletInsertionEvent && LOGGER.isErrorEnabled()) {
                        LOGGER.error("The event polled from the queue is not the same as the event peeked from the queue. Peeked event: {}, polled event: {}.", tabletInsertionEvent, tabletInsertionEvent2);
                    }
                    if (tabletInsertionEvent2 != null) {
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("Polled event {} from retry queue.", tabletInsertionEvent2);
                        }
                        removeEventFromBuffer((EnrichedEvent) tabletInsertionEvent2);
                    }
                } finally {
                    this.lock.unlock();
                }
            } catch (InterruptedException e) {
                LOGGER.info("PipeConsensusAsyncConnector try to lock interrupted. Will exit directly", e);
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    public void addFailureEventToRetryQueue(Event event) {
        if (this.isClosed.get()) {
            if (event instanceof EnrichedEvent) {
                ((EnrichedEvent) event).clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
                return;
            }
            return;
        }
        this.retryEventQueue.offer(event);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("PipeConsensus-ConsensusGroup-{}: Event {} transfer failed, will be added to retry queue.", Integer.valueOf(this.consensusGroupId), event);
        }
        if (this.isClosed.get() && (event instanceof EnrichedEvent)) {
            ((EnrichedEvent) event).clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
        }
    }

    public void addFailureEventsToRetryQueue(Iterable<Event> iterable) {
        Iterator<Event> it = iterable.iterator();
        while (it.hasNext()) {
            addFailureEventToRetryQueue(it.next());
        }
    }

    public void clearRetryEventsReferenceCount() {
        boolean z = true;
        try {
            this.lock.lockInterruptibly();
        } catch (InterruptedException e) {
            LOGGER.info("PipeConsensusAsyncConnector try to lock interrupted.", e);
            Thread.currentThread().interrupt();
            z = false;
        }
        while (!this.retryEventQueue.isEmpty()) {
            try {
                EnrichedEvent enrichedEvent = (Event) this.retryEventQueue.poll();
                if (enrichedEvent instanceof EnrichedEvent) {
                    enrichedEvent.clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
                }
            } finally {
                if (z) {
                    this.lock.unlock();
                }
            }
        }
    }

    public void clearTransferBufferReferenceCount() {
        boolean z = true;
        try {
            this.lock.lockInterruptibly();
        } catch (InterruptedException e) {
            LOGGER.info("PipeConsensusAsyncConnector try to lock interrupted.", e);
            Thread.currentThread().interrupt();
            z = false;
        }
        while (!this.transferBuffer.isEmpty()) {
            try {
                this.transferBuffer.poll().clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
            } finally {
                if (z) {
                    this.lock.unlock();
                }
            }
        }
    }

    private void logOnClientException(AsyncPipeConsensusServiceClient asyncPipeConsensusServiceClient, Exception exc) {
        if (asyncPipeConsensusServiceClient == null) {
            LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT, exc);
        } else {
            LOGGER.warn(String.format(THRIFT_ERROR_FORMATTER_WITH_ENDPOINT, asyncPipeConsensusServiceClient.getTEndpoint().getIp(), Integer.valueOf(asyncPipeConsensusServiceClient.getTEndpoint().getPort())), exc);
        }
    }

    private TEndPoint getFollowerUrl() {
        return (TEndPoint) this.nodeUrls.get(0);
    }

    public void close() {
        boolean z = true;
        try {
            this.lock.lockInterruptibly();
        } catch (InterruptedException e) {
            LOGGER.info("PipeConsensusAsyncConnector try to lock interrupted.", e);
            Thread.currentThread().interrupt();
            z = false;
        }
        try {
            super.close();
            this.isClosed.set(true);
            this.retryConnector.close();
            clearRetryEventsReferenceCount();
            clearTransferBufferReferenceCount();
            if (this.tabletBatchBuilder != null) {
                this.tabletBatchBuilder.close();
            }
            PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr()).removeConsensusPipeConnector(new ConsensusPipeName(this.consensusPipeName));
            MetricService.getInstance().removeMetricSet(this.pipeConsensusConnectorMetrics);
            if (z) {
                this.lock.unlock();
            }
        } catch (Throwable th) {
            if (z) {
                this.lock.unlock();
            }
            throw th;
        }
    }

    public int getTransferBufferSize() {
        return this.transferBuffer.size();
    }

    public int getRetryBufferSize() {
        return this.retryEventQueue.size();
    }

    public long getConsensusPipeCommitProgress() {
        return PipeEventCommitManager.getInstance().getGivenConsensusPipeCommitId(this.consensusPipeName, PipeDataNodeAgent.task().getPipeCreationTime(this.consensusPipeName), this.consensusGroupId);
    }

    public long getConsensusPipeReplicateProgress() {
        return this.currentReplicateProgress;
    }

    public String getConsensusGroupIdStr() {
        return ConsensusGroupId.Factory.create(TConsensusGroupType.DataRegion.getValue(), this.consensusGroupId).toString();
    }
}
