package org.apache.iotdb.db.pipe.agent.task.connection;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.class */
public class PipeEventCollector implements EventCollector {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeEventCollector.class);
    private final UnboundedBlockingPendingQueue<Event> pendingQueue;
    private final long creationTime;
    private final int regionId;
    private final boolean forceTabletFormat;
    private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
    private boolean hasNoGeneratedEvent = true;
    private boolean isFailedToIncreaseReferenceCount = false;

    public PipeEventCollector(UnboundedBlockingPendingQueue<Event> unboundedBlockingPendingQueue, long j, int i, boolean z) {
        this.pendingQueue = unboundedBlockingPendingQueue;
        this.creationTime = j;
        this.regionId = i;
        this.forceTabletFormat = z;
    }

    public void collect(Event event) {
        try {
            if (event instanceof PipeInsertNodeTabletInsertionEvent) {
                parseAndCollectEvent((PipeInsertNodeTabletInsertionEvent) event);
            } else if (event instanceof PipeRawTabletInsertionEvent) {
                parseAndCollectEvent((PipeRawTabletInsertionEvent) event);
            } else if (event instanceof PipeTsFileInsertionEvent) {
                parseAndCollectEvent((PipeTsFileInsertionEvent) event);
            } else if ((event instanceof PipeSchemaRegionWritePlanEvent) && ((PipeSchemaRegionWritePlanEvent) event).getPlanNode().getType() == PlanNodeType.DELETE_DATA) {
                parseAndCollectEvent((PipeSchemaRegionWritePlanEvent) event);
            } else if (!(event instanceof ProgressReportEvent)) {
                collectEvent(event);
            }
        } catch (Exception e) {
            throw new PipeException("Error occurred when collecting events from processor.", e);
        } catch (PipeException e2) {
            throw e2;
        }
    }

    private void parseAndCollectEvent(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) {
        if (!pipeInsertNodeTabletInsertionEvent.shouldParseTimeOrPattern()) {
            collectEvent(pipeInsertNodeTabletInsertionEvent);
            return;
        }
        Iterator<PipeRawTabletInsertionEvent> it = pipeInsertNodeTabletInsertionEvent.toRawTabletInsertionEvents().iterator();
        while (it.hasNext()) {
            collectParsedRawTableEvent(it.next());
        }
    }

    private void parseAndCollectEvent(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) {
        collectParsedRawTableEvent(pipeRawTabletInsertionEvent.shouldParseTimeOrPattern() ? pipeRawTabletInsertionEvent.parseEventWithPatternOrTime() : pipeRawTabletInsertionEvent);
    }

    private void parseAndCollectEvent(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws Exception {
        if (!pipeTsFileInsertionEvent.waitForTsFileClose()) {
            LOGGER.warn("Pipe skipping temporary TsFile which shouldn't be transferred: {}", pipeTsFileInsertionEvent.getTsFile());
            return;
        }
        if (!this.forceTabletFormat && !pipeTsFileInsertionEvent.shouldParseTimeOrPattern()) {
            collectEvent(pipeTsFileInsertionEvent);
            return;
        }
        try {
            Iterator<TabletInsertionEvent> it = pipeTsFileInsertionEvent.toTabletInsertionEvents().iterator();
            while (it.hasNext()) {
                collectParsedRawTableEvent((PipeRawTabletInsertionEvent) it.next());
            }
        } finally {
            pipeTsFileInsertionEvent.close();
        }
    }

    private void collectParsedRawTableEvent(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) {
        if (pipeRawTabletInsertionEvent.hasNoNeedParsingAndIsEmpty()) {
            return;
        }
        this.hasNoGeneratedEvent = false;
        collectEvent(pipeRawTabletInsertionEvent);
    }

    private void parseAndCollectEvent(PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent) {
        IoTDBSchemaRegionExtractor.PATTERN_PARSE_VISITOR.process(pipeSchemaRegionWritePlanEvent.getPlanNode(), pipeSchemaRegionWritePlanEvent.getPipePattern()).map(planNode -> {
            return new PipeSchemaRegionWritePlanEvent(planNode, pipeSchemaRegionWritePlanEvent.getPipeName(), pipeSchemaRegionWritePlanEvent.getCreationTime(), pipeSchemaRegionWritePlanEvent.getPipeTaskMeta(), pipeSchemaRegionWritePlanEvent.getPipePattern(), pipeSchemaRegionWritePlanEvent.isGeneratedByPipe());
        }).ifPresent(pipeSchemaRegionWritePlanEvent2 -> {
            this.hasNoGeneratedEvent = false;
            collectEvent(pipeSchemaRegionWritePlanEvent2);
        });
    }

    private void collectEvent(Event event) {
        if (event instanceof EnrichedEvent) {
            if (!((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName())) {
                LOGGER.warn("PipeEventCollector: The event {} is already released, skipping it.", event);
                this.isFailedToIncreaseReferenceCount = true;
                return;
            } else {
                PipeEventCommitManager.getInstance().enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, this.creationTime, this.regionId);
                ((EnrichedEvent) event).setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
            }
        }
        if (event instanceof PipeHeartbeatEvent) {
            ((PipeHeartbeatEvent) event).recordConnectorQueueSize(this.pendingQueue);
        }
        this.pendingQueue.directOffer(event);
        this.collectInvocationCount.incrementAndGet();
    }

    public void resetFlags() {
        this.collectInvocationCount.set(0);
        this.hasNoGeneratedEvent = true;
        this.isFailedToIncreaseReferenceCount = false;
    }

    public long getCollectInvocationCount() {
        return this.collectInvocationCount.get();
    }

    public boolean hasNoCollectInvocationAfterReset() {
        return this.collectInvocationCount.get() == 0;
    }

    public boolean hasNoGeneratedEvent() {
        return this.hasNoGeneratedEvent;
    }

    public boolean isFailedToIncreaseReferenceCount() {
        return this.isFailedToIncreaseReferenceCount;
    }
}
