package org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusBatchTransferReq;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusBatchTransferResp;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.builder.PipeConsensusAsyncBatchReqBuilder;
import org.apache.iotdb.db.pipe.consensus.PipeConsensusConnectorMetrics;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.class */
public class PipeConsensusTabletBatchEventHandler implements AsyncMethodCallback<TPipeConsensusBatchTransferResp> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusTabletBatchEventHandler.class);
    private final List<Long> requestCommitIds;
    private final List<Event> events;
    private final TPipeConsensusBatchTransferReq req;
    private final PipeConsensusAsyncConnector connector;
    private final PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics;

    public PipeConsensusTabletBatchEventHandler(PipeConsensusAsyncBatchReqBuilder pipeConsensusAsyncBatchReqBuilder, PipeConsensusAsyncConnector pipeConsensusAsyncConnector, PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics) throws IOException {
        this.requestCommitIds = pipeConsensusAsyncBatchReqBuilder.deepCopyRequestCommitIds();
        this.events = pipeConsensusAsyncBatchReqBuilder.deepCopyEvents();
        this.req = pipeConsensusAsyncBatchReqBuilder.toTPipeConsensusBatchTransferReq();
        this.pipeConsensusConnectorMetrics = pipeConsensusConnectorMetrics;
        this.connector = pipeConsensusAsyncConnector;
    }

    public void transfer(AsyncPipeConsensusServiceClient asyncPipeConsensusServiceClient) throws TException {
        asyncPipeConsensusServiceClient.pipeConsensusBatchTransfer(this.req, this);
    }

    public void onComplete(TPipeConsensusBatchTransferResp tPipeConsensusBatchTransferResp) {
        if (tPipeConsensusBatchTransferResp == null) {
            onError(new PipeException("TPipeConsensusBatchTransferResp is null"));
            return;
        }
        try {
            List list = (List) tPipeConsensusBatchTransferResp.getBatchResps().stream().map((v0) -> {
                return v0.getStatus();
            }).collect(Collectors.toList());
            if (list.stream().anyMatch(tSStatus -> {
                return tSStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode();
            })) {
                list.stream().filter(tSStatus2 -> {
                    return tSStatus2.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode();
                }).forEach(tSStatus3 -> {
                    this.pipeConsensusConnectorMetrics.recordRetryCounter();
                    this.connector.statusHandler().handle(tSStatus3, tSStatus3.getMessage(), this.events.toString());
                });
                this.connector.addFailureEventsToRetryQueue(this.events);
            } else {
                this.events.forEach(event -> {
                    this.connector.removeEventFromBuffer((EnrichedEvent) event);
                });
            }
            Iterator<Event> it = this.events.iterator();
            while (it.hasNext()) {
                EnrichedEvent enrichedEvent = (Event) it.next();
                if (enrichedEvent instanceof EnrichedEvent) {
                    enrichedEvent.decreaseReferenceCount(PipeConsensusTabletBatchEventHandler.class.getName(), true);
                }
            }
        } catch (Exception e) {
            onError(e);
        }
    }

    public void onError(Exception exc) {
        LOGGER.warn("PipeConsensus: Failed to transfer TabletInsertionEvent batch. Total failed events: {}, related pipe names: {}", new Object[]{Integer.valueOf(this.events.size()), this.events.stream().map(event -> {
            return event instanceof EnrichedEvent ? ((EnrichedEvent) event).getPipeName() : "UNKNOWN";
        }).collect(Collectors.toSet()), exc});
        this.connector.addFailureEventsToRetryQueue(this.events);
    }
}
