package org.apache.iotdb.db.subscription.event.batch;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.class */
public abstract class SubscriptionPipeEventBatch {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPipeEventBatch.class);
    private final int regionId;
    protected final SubscriptionPrefetchingQueue prefetchingQueue;
    protected final int maxDelayInMs;
    protected final long maxBatchSizeInBytes;
    protected volatile List<SubscriptionEvent> events = null;
    protected final List<EnrichedEvent> enrichedEvents = new ArrayList();

    /* JADX INFO: Access modifiers changed from: protected */
    public SubscriptionPipeEventBatch(int i, SubscriptionPrefetchingQueue subscriptionPrefetchingQueue, int i2, long j) {
        this.regionId = i;
        this.prefetchingQueue = subscriptionPrefetchingQueue;
        this.maxDelayInMs = i2;
        this.maxBatchSizeInBytes = j;
    }

    public abstract void ack();

    public abstract void cleanUp(boolean z);

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized boolean onEvent(Consumer<SubscriptionEvent> consumer) throws Exception {
        if (!shouldEmit() || this.enrichedEvents.isEmpty()) {
            return false;
        }
        if (Objects.isNull(this.events)) {
            this.events = generateSubscriptionEvents();
        }
        if (!Objects.nonNull(this.events)) {
            return false;
        }
        this.events.forEach(consumer);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized boolean onEvent(EnrichedEvent enrichedEvent, Consumer<SubscriptionEvent> consumer) throws Exception {
        if (enrichedEvent instanceof TabletInsertionEvent) {
            onTabletInsertionEvent((TabletInsertionEvent) enrichedEvent);
            this.enrichedEvents.add(enrichedEvent);
        } else if (enrichedEvent instanceof TsFileInsertionEvent) {
            onTsFileInsertionEvent((TsFileInsertionEvent) enrichedEvent);
            this.enrichedEvents.add(enrichedEvent);
        } else {
            LOGGER.warn("SubscriptionPipeEventBatch {} ignore EnrichedEvent {} when batching.", this, enrichedEvent);
        }
        return onEvent(consumer);
    }

    protected abstract void onTabletInsertionEvent(TabletInsertionEvent tabletInsertionEvent);

    protected abstract void onTsFileInsertionEvent(TsFileInsertionEvent tsFileInsertionEvent);

    protected abstract boolean shouldEmit();

    protected abstract List<SubscriptionEvent> generateSubscriptionEvents() throws Exception;

    public int getPipeEventCount() {
        return this.enrichedEvents.size();
    }
}
