package org.apache.iotdb.db.subscription.event.batch;

import com.codahale.metrics.Clock;
import com.codahale.metrics.Meter;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTabletQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.metrics.core.utils.IoTDBMovingAverage;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.class */
public class SubscriptionPipeTabletEventBatch extends SubscriptionPipeEventBatch implements Iterator<Pair<String, List<Tablet>>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPipeTabletEventBatch.class);
    private long firstEventProcessingTime;
    private long totalBufferSize;
    private volatile Iterator<EnrichedEvent> currentEnrichedEventsIterator;
    private volatile Iterator<TabletInsertionEvent> currentTabletInsertionEventsIterator;
    private volatile TsFileInsertionEvent currentTsFileInsertionEvent;
    private final Meter insertNodeTabletInsertionEventSizeEstimator;
    private final Meter rawTabletInsertionEventSizeEstimator;
    private volatile SubscriptionPipeTabletIterationSnapshot iterationSnapshot;
    private final AtomicInteger referenceCount;
    private static final long ITERATED_COUNT_REPORT_FREQ = 30000;
    private final AtomicLong iteratedCount;

    public SubscriptionPipeTabletEventBatch(int i, SubscriptionPrefetchingTabletQueue subscriptionPrefetchingTabletQueue, int i2, long j) {
        super(i, subscriptionPrefetchingTabletQueue, i2, j);
        this.firstEventProcessingTime = Long.MIN_VALUE;
        this.totalBufferSize = 0L;
        this.referenceCount = new AtomicInteger();
        this.iteratedCount = new AtomicLong();
        this.insertNodeTabletInsertionEventSizeEstimator = new Meter(new IoTDBMovingAverage(), Clock.defaultClock());
        this.rawTabletInsertionEventSizeEstimator = new Meter(new IoTDBMovingAverage(), Clock.defaultClock());
        resetForIteration();
    }

    @Override // org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatch
    public synchronized void ack() {
        this.referenceCount.decrementAndGet();
        if (hasNext() || this.referenceCount.get() != 0) {
            return;
        }
        Iterator<EnrichedEvent> it = this.enrichedEvents.iterator();
        while (it.hasNext()) {
            it.next().decreaseReferenceCount(getClass().getName(), true);
        }
    }

    @Override // org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatch
    public synchronized void cleanUp(boolean z) {
        if (z || (!hasNext() && this.referenceCount.get() == 0)) {
            Iterator<EnrichedEvent> it = this.enrichedEvents.iterator();
            while (it.hasNext()) {
                it.next().clearReferenceCount(getClass().getName());
            }
            this.enrichedEvents.clear();
            resetForIteration();
        }
    }

    @Override // org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatch
    protected void onTabletInsertionEvent(TabletInsertionEvent tabletInsertionEvent) {
        if (this.firstEventProcessingTime == Long.MIN_VALUE) {
            this.firstEventProcessingTime = System.currentTimeMillis();
        }
        if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
            this.totalBufferSize += getEstimatedInsertNodeTabletInsertionEventSize();
        } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
            this.totalBufferSize += getEstimatedRawTabletInsertionEventSize();
        }
    }

    @Override // org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatch
    protected void onTsFileInsertionEvent(TsFileInsertionEvent tsFileInsertionEvent) {
        if (this.firstEventProcessingTime == Long.MIN_VALUE) {
            this.firstEventProcessingTime = System.currentTimeMillis();
        }
        this.totalBufferSize += ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile().length();
    }

    @Override // org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatch
    protected List<SubscriptionEvent> generateSubscriptionEvents() {
        resetForIteration();
        return Collections.singletonList(new SubscriptionEvent(this, this.prefetchingQueue));
    }

    @Override // org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatch
    protected boolean shouldEmit() {
        return this.totalBufferSize >= this.maxBatchSizeInBytes || System.currentTimeMillis() - this.firstEventProcessingTime >= ((long) this.maxDelayInMs) || ((long) this.enrichedEvents.size()) >= SubscriptionConfig.getInstance().getSubscriptionMaxAllowedEventCountInTabletBatch();
    }

    private Pair<String, List<Tablet>> convertToTablets(TabletInsertionEvent tabletInsertionEvent) {
        if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
            List<Tablet> convertToTablets = ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).convertToTablets();
            updateEstimatedInsertNodeTabletInsertionEventSize(((Long) convertToTablets.stream().map(PipeMemoryWeightUtil::calculateTabletSizeInBytes).reduce((v0, v1) -> {
                return Long.sum(v0, v1);
            }).orElse(0L)).longValue());
            return new Pair<>(((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).isTableModelEvent() ? ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).getTableModelDatabaseName() : null, convertToTablets);
        }
        if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
            LOGGER.warn("SubscriptionPipeTabletEventBatch {} only support convert PipeInsertNodeTabletInsertionEvent or PipeRawTabletInsertionEvent to tablet. Ignore {}.", this, tabletInsertionEvent);
            return null;
        }
        Tablet convertToTablet = ((PipeRawTabletInsertionEvent) tabletInsertionEvent).convertToTablet();
        updateEstimatedRawTabletInsertionEventSize(PipeMemoryWeightUtil.calculateTabletSizeInBytes(convertToTablet));
        return new Pair<>(((PipeRawTabletInsertionEvent) tabletInsertionEvent).isTableModelEvent() ? ((PipeRawTabletInsertionEvent) tabletInsertionEvent).getTableModelDatabaseName() : null, Collections.singletonList(convertToTablet));
    }

    private long getEstimatedInsertNodeTabletInsertionEventSize() {
        return Math.max(SubscriptionConfig.getInstance().getSubscriptionEstimatedInsertNodeTabletInsertionEventSize(), (long) this.insertNodeTabletInsertionEventSizeEstimator.getOneMinuteRate());
    }

    private void updateEstimatedInsertNodeTabletInsertionEventSize(long j) {
        this.insertNodeTabletInsertionEventSizeEstimator.mark(j);
    }

    private long getEstimatedRawTabletInsertionEventSize() {
        return Math.max(SubscriptionConfig.getInstance().getSubscriptionEstimatedRawTabletInsertionEventSize(), (long) this.rawTabletInsertionEventSizeEstimator.getOneMinuteRate());
    }

    private void updateEstimatedRawTabletInsertionEventSize(long j) {
        this.rawTabletInsertionEventSizeEstimator.mark(j);
    }

    public synchronized SubscriptionPipeTabletIterationSnapshot sendIterationSnapshot() {
        SubscriptionPipeTabletIterationSnapshot subscriptionPipeTabletIterationSnapshot = this.iterationSnapshot;
        this.iterationSnapshot = new SubscriptionPipeTabletIterationSnapshot();
        this.referenceCount.incrementAndGet();
        return subscriptionPipeTabletIterationSnapshot;
    }

    public synchronized void resetForIteration() {
        this.currentEnrichedEventsIterator = this.enrichedEvents.iterator();
        this.currentTabletInsertionEventsIterator = null;
        if (Objects.nonNull(this.currentTsFileInsertionEvent) && (this.currentTsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
            ((PipeTsFileInsertionEvent) this.currentTsFileInsertionEvent).close();
        }
        this.currentTsFileInsertionEvent = null;
        if (Objects.nonNull(this.iterationSnapshot)) {
            this.iterationSnapshot.cleanUp();
        }
        this.iterationSnapshot = new SubscriptionPipeTabletIterationSnapshot();
        this.referenceCount.set(0);
        this.iteratedCount.set(0L);
    }

    @Override // java.util.Iterator
    public synchronized boolean hasNext() {
        if (Objects.nonNull(this.currentTabletInsertionEventsIterator)) {
            if (this.currentTabletInsertionEventsIterator.hasNext()) {
                return true;
            }
            this.currentTabletInsertionEventsIterator = null;
            this.currentTsFileInsertionEvent = null;
            return hasNext();
        }
        if (Objects.isNull(this.currentEnrichedEventsIterator)) {
            return false;
        }
        if (this.currentEnrichedEventsIterator.hasNext()) {
            return true;
        }
        this.currentEnrichedEventsIterator = null;
        return false;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.Iterator
    public synchronized Pair<String, List<Tablet>> next() {
        Pair<String, List<Tablet>> nextInternal = nextInternal();
        if (Objects.isNull(nextInternal)) {
            return null;
        }
        if (this.iteratedCount.incrementAndGet() % 30000 == 0) {
            Logger logger = LOGGER;
            Object[] objArr = new Object[3];
            objArr[0] = this;
            objArr[1] = this.iteratedCount;
            objArr[2] = Objects.isNull(this.currentTsFileInsertionEvent) ? "<unknown>" : this.currentTsFileInsertionEvent.coreReportMessage();
            logger.info("{} has been iterated {} times, current TsFileInsertionEvent {}", objArr);
        }
        return nextInternal;
    }

    private Pair<String, List<Tablet>> nextInternal() {
        if (Objects.nonNull(this.currentTabletInsertionEventsIterator)) {
            if (this.currentTabletInsertionEventsIterator.hasNext()) {
                TabletInsertionEvent next = this.currentTabletInsertionEventsIterator.next();
                if (!(next instanceof PipeRawTabletInsertionEvent)) {
                    LOGGER.warn("SubscriptionPipeTabletEventBatch: Unexpected tablet insertion event {}, skipping it.", next);
                } else if (((PipeRawTabletInsertionEvent) next).increaseReferenceCount(getClass().getName())) {
                    this.iterationSnapshot.addParsedEnrichedEvent((PipeRawTabletInsertionEvent) next);
                } else {
                    LOGGER.warn("SubscriptionPipeTabletEventBatch: Failed to increase the reference count of event {}, skipping it.", ((PipeRawTabletInsertionEvent) next).coreReportMessage());
                }
                if (!this.currentTabletInsertionEventsIterator.hasNext()) {
                    this.iterationSnapshot.addIteratedEnrichedEvent((EnrichedEvent) this.currentTsFileInsertionEvent);
                }
                return convertToTablets(next);
            }
            this.currentTabletInsertionEventsIterator = null;
            this.currentTsFileInsertionEvent = null;
        }
        if (Objects.isNull(this.currentEnrichedEventsIterator) || !this.currentEnrichedEventsIterator.hasNext()) {
            return null;
        }
        EnrichedEvent next2 = this.currentEnrichedEventsIterator.next();
        if (!(next2 instanceof TsFileInsertionEvent)) {
            if (next2 instanceof TabletInsertionEvent) {
                this.iterationSnapshot.addIteratedEnrichedEvent(next2);
                return convertToTablets((TabletInsertionEvent) next2);
            }
            LOGGER.warn("SubscriptionPipeTabletEventBatch {} ignore EnrichedEvent {} when iterating (broken invariant).", this, next2);
            return null;
        }
        if (Objects.nonNull(this.currentTabletInsertionEventsIterator)) {
            LOGGER.warn("SubscriptionPipeTabletEventBatch {} override non-null currentTabletInsertionEventsIterator when iterating (broken invariant).", this);
        }
        PipeTsFileInsertionEvent pipeTsFileInsertionEvent = (PipeTsFileInsertionEvent) next2;
        this.currentTsFileInsertionEvent = pipeTsFileInsertionEvent;
        this.currentTabletInsertionEventsIterator = pipeTsFileInsertionEvent.toTabletInsertionEvents((long) ((1.0d + Math.random()) * SubscriptionAgent.receiver().remainingMs())).iterator();
        return next();
    }
}
