package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.class */
public abstract class PipeTabletEventBatch implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventBatch.class);
    private final int maxDelayInMs;
    private final PipeMemoryBlock allocatedMemoryBlock;
    protected final List<EnrichedEvent> events = new ArrayList();
    private long firstEventProcessingTime = Long.MIN_VALUE;
    protected long totalBufferSize = 0;
    protected volatile boolean isClosed = false;

    /* JADX INFO: Access modifiers changed from: protected */
    public PipeTabletEventBatch(int i, long j) {
        this.maxDelayInMs = i;
        this.allocatedMemoryBlock = PipeDataNodeResourceManager.memory().tryAllocate(j).setShrinkMethod(j2 -> {
            return Math.max(j2 / 2, 0L);
        }).setShrinkCallback((l, l2) -> {
            LOGGER.info("The batch size limit has shrunk from {} to {}.", l, l2);
        }).setExpandMethod(j3 -> {
            return Math.min(Math.max(j3, 1L) * 2, j);
        }).setExpandCallback((l3, l4) -> {
            LOGGER.info("The batch size limit has expanded from {} to {}.", l3, l4);
        });
        if (getMaxBatchSizeInBytes() != j) {
            LOGGER.info("PipeTabletEventBatch: the max batch size is adjusted from {} to {} due to the memory restriction", Long.valueOf(j), Long.valueOf(getMaxBatchSizeInBytes()));
        }
    }

    public synchronized boolean onEvent(TabletInsertionEvent tabletInsertionEvent) throws WALPipeException, IOException {
        if (this.isClosed || !(tabletInsertionEvent instanceof EnrichedEvent)) {
            return false;
        }
        if (this.events.isEmpty() || !Objects.equals(this.events.get(this.events.size() - 1), tabletInsertionEvent)) {
            if (((EnrichedEvent) tabletInsertionEvent).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName())) {
                try {
                    if (constructBatch(tabletInsertionEvent)) {
                        this.events.add((EnrichedEvent) tabletInsertionEvent);
                    }
                    if (this.firstEventProcessingTime == Long.MIN_VALUE) {
                        this.firstEventProcessingTime = System.currentTimeMillis();
                    }
                } catch (Exception e) {
                    ((EnrichedEvent) tabletInsertionEvent).decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false);
                    throw e;
                }
            } else {
                LOGGER.warn("Cannot increase reference count for event: {}, ignore it in batch.", tabletInsertionEvent);
            }
        }
        return shouldEmit();
    }

    protected abstract boolean constructBatch(TabletInsertionEvent tabletInsertionEvent) throws WALPipeException, IOException;

    public boolean shouldEmit() {
        return this.totalBufferSize >= getMaxBatchSizeInBytes() || System.currentTimeMillis() - this.firstEventProcessingTime >= ((long) this.maxDelayInMs);
    }

    private long getMaxBatchSizeInBytes() {
        return this.allocatedMemoryBlock.getMemoryUsageInBytes();
    }

    public synchronized void onSuccess() {
        this.events.clear();
        this.totalBufferSize = 0L;
        this.firstEventProcessingTime = Long.MIN_VALUE;
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        this.isClosed = true;
        clearEventsReferenceCount(PipeTabletEventBatch.class.getName());
        this.events.clear();
        if (this.allocatedMemoryBlock != null) {
            this.allocatedMemoryBlock.close();
        }
    }

    public synchronized void discardEventsOfPipe(String str, int i) {
        this.events.removeIf(enrichedEvent -> {
            if (!str.equals(enrichedEvent.getPipeName()) || i != enrichedEvent.getRegionId()) {
                return false;
            }
            enrichedEvent.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
            return true;
        });
    }

    public synchronized void decreaseEventsReferenceCount(String str, boolean z) {
        this.events.forEach(enrichedEvent -> {
            enrichedEvent.decreaseReferenceCount(str, z);
        });
    }

    private void clearEventsReferenceCount(String str) {
        this.events.forEach(enrichedEvent -> {
            enrichedEvent.clearReferenceCount(str);
        });
    }

    public List<EnrichedEvent> deepCopyEvents() {
        return new ArrayList(this.events);
    }

    public boolean isEmpty() {
        return this.events.isEmpty();
    }
}
