package org.apache.iotdb.db.pipe.agent.task.subtask.connector;

import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractConnectorSubtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionConnectorMetrics;
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.class */
public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask {
    protected final UnboundedBlockingPendingQueue<Event> inputPendingQueue;
    private final String attributeSortedString;
    private final int connectorIndex;
    private long lastHeartbeatEventInjectTime;
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConnectorSubtask.class);
    public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT = new PipeHeartbeatEvent("cron", false);
    private static final long CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS = PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds() * 1000;

    public PipeConnectorSubtask(String str, long j, String str2, int i, UnboundedBlockingPendingQueue<Event> unboundedBlockingPendingQueue, PipeConnector pipeConnector) {
        super(str, j, pipeConnector);
        this.lastHeartbeatEventInjectTime = System.currentTimeMillis();
        this.attributeSortedString = str2;
        this.connectorIndex = i;
        this.inputPendingQueue = unboundedBlockingPendingQueue;
        if (str2.startsWith("schema_")) {
            PipeSchemaRegionConnectorMetrics.getInstance().register(this);
        } else {
            PipeDataRegionConnectorMetrics.getInstance().register(this);
        }
    }

    protected boolean executeOnce() {
        if (this.isClosed.get()) {
            return false;
        }
        Event maybeOf = this.lastEvent != null ? this.lastEvent : UserDefinedEnrichedEvent.maybeOf(this.inputPendingQueue.waitedPoll());
        setLastEvent(maybeOf);
        if ((maybeOf instanceof EnrichedEvent) && ((EnrichedEvent) maybeOf).isReleased()) {
            this.lastEvent = null;
            return true;
        }
        try {
            if (maybeOf == null) {
                if (System.currentTimeMillis() - this.lastHeartbeatEventInjectTime <= CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS) {
                    return false;
                }
                transferHeartbeatEvent(CRON_HEARTBEAT_EVENT);
                return false;
            }
            if (maybeOf instanceof TabletInsertionEvent) {
                this.outputPipeConnector.transfer((TabletInsertionEvent) maybeOf);
                PipeDataRegionConnectorMetrics.getInstance().markTabletEvent(this.taskID);
            } else if (maybeOf instanceof TsFileInsertionEvent) {
                this.outputPipeConnector.transfer((TsFileInsertionEvent) maybeOf);
                PipeDataRegionConnectorMetrics.getInstance().markTsFileEvent(this.taskID);
            } else if (maybeOf instanceof PipeSchemaRegionWritePlanEvent) {
                this.outputPipeConnector.transfer(maybeOf);
                if (((PipeSchemaRegionWritePlanEvent) maybeOf).getPlanNode().getType() != PlanNodeType.DELETE_DATA) {
                    PipeSchemaRegionConnectorMetrics.getInstance().markSchemaEvent(this.taskID);
                }
            } else if (maybeOf instanceof PipeHeartbeatEvent) {
                transferHeartbeatEvent((PipeHeartbeatEvent) maybeOf);
            } else {
                this.outputPipeConnector.transfer(maybeOf instanceof UserDefinedEnrichedEvent ? ((UserDefinedEnrichedEvent) maybeOf).getUserDefinedEvent() : maybeOf);
            }
            decreaseReferenceCountAndReleaseLastEvent(maybeOf, true);
            return true;
        } catch (Exception e) {
            if (this.isClosed.get()) {
                LOGGER.info("Exception in pipe transfer, ignored because the connector subtask is dropped.", e);
                clearReferenceCountAndReleaseLastEvent(maybeOf);
                return true;
            }
            setLastExceptionEvent(maybeOf);
            Object[] objArr = new Object[3];
            objArr[0] = this.taskID;
            objArr[1] = maybeOf instanceof EnrichedEvent ? ((EnrichedEvent) maybeOf).coreReportMessage() : maybeOf;
            objArr[2] = ErrorHandlingUtils.getRootCause(e).getMessage();
            throw new PipeException(String.format("Exception in pipe transfer, subtask: %s, last event: %s, root cause: %s", objArr), e);
        } catch (PipeException e2) {
            if (!this.isClosed.get()) {
                setLastExceptionEvent(maybeOf);
                throw e2;
            }
            LOGGER.info("{} in pipe transfer, ignored because the connector subtask is dropped.", e2.getClass().getSimpleName(), e2);
            clearReferenceCountAndReleaseLastEvent(maybeOf);
            return true;
        }
    }

    private void transferHeartbeatEvent(PipeHeartbeatEvent pipeHeartbeatEvent) {
        try {
            this.outputPipeConnector.heartbeat();
            this.outputPipeConnector.transfer(pipeHeartbeatEvent);
            this.lastHeartbeatEventInjectTime = System.currentTimeMillis();
            pipeHeartbeatEvent.onTransferred();
            PipeDataRegionConnectorMetrics.getInstance().markPipeHeartbeatEvent(this.taskID);
        } catch (Exception e) {
            throw new PipeConnectionException("PipeConnector: " + this.outputPipeConnector.getClass().getName() + "(id: " + this.taskID + ") heartbeat failed, or encountered failure when transferring generic event. Failure: " + e.getMessage(), e);
        }
    }

    public void close() {
        if (this.attributeSortedString.startsWith("schema_")) {
            PipeSchemaRegionConnectorMetrics.getInstance().deregister(this.taskID);
        } else {
            PipeDataRegionConnectorMetrics.getInstance().deregister(this.taskID);
        }
        this.isClosed.set(true);
        try {
            long currentTimeMillis = System.currentTimeMillis();
            this.outputPipeConnector.close();
            LOGGER.info("Pipe: connector subtask {} ({}) was closed within {} ms", new Object[]{this.taskID, this.outputPipeConnector, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
        } catch (Exception e) {
            LOGGER.info("Exception occurred when closing pipe connector subtask {}, root cause: {}", new Object[]{this.taskID, ErrorHandlingUtils.getRootCause(e).getMessage(), e});
        } finally {
            this.inputPendingQueue.discardAllEvents();
            super.close();
        }
    }

    public void discardEventsOfPipe(String str, int i) {
        this.inputPendingQueue.discardEventsOfPipe(str, i);
        synchronized (this) {
            if ((this.lastEvent instanceof EnrichedEvent) && str.equals(this.lastEvent.getPipeName()) && i == this.lastEvent.getRegionId()) {
                this.lastEvent = null;
                submitSelf();
            }
            if ((this.lastExceptionEvent instanceof EnrichedEvent) && str.equals(this.lastExceptionEvent.getPipeName()) && i == this.lastExceptionEvent.getRegionId()) {
                clearReferenceCountAndReleaseLastExceptionEvent();
            }
        }
        if (this.outputPipeConnector instanceof IoTDBConnector) {
            this.outputPipeConnector.discardEventsOfPipe(str, i);
        }
    }

    public String getAttributeSortedString() {
        return this.attributeSortedString;
    }

    public int getConnectorIndex() {
        return this.connectorIndex;
    }

    public int getTsFileInsertionEventCount() {
        return this.inputPendingQueue.getTsFileInsertionEventCount() + (this.lastEvent instanceof TsFileInsertionEvent ? 1 : 0);
    }

    public int getTabletInsertionEventCount() {
        return this.inputPendingQueue.getTabletInsertionEventCount() + (this.lastEvent instanceof TabletInsertionEvent ? 1 : 0);
    }

    public int getPipeHeartbeatEventCount() {
        return this.inputPendingQueue.getPipeHeartbeatEventCount() + (this.lastEvent instanceof PipeHeartbeatEvent ? 1 : 0);
    }

    public int getAsyncConnectorRetryEventQueueSize() {
        if (this.outputPipeConnector instanceof IoTDBDataRegionAsyncConnector) {
            return this.outputPipeConnector.getRetryEventQueueSize();
        }
        return 0;
    }

    protected String getRootCause(Throwable th) {
        return ErrorHandlingUtils.getRootCause(th).getMessage();
    }

    protected void report(EnrichedEvent enrichedEvent, PipeRuntimeException pipeRuntimeException) {
        PipeDataNodeAgent.runtime().report(enrichedEvent, pipeRuntimeException);
    }
}
