package org.apache.iotdb.db.subscription.event.batch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTabletQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.class */
public class SubscriptionPipeTabletEventBatch extends SubscriptionPipeEventBatch {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPipeTabletEventBatch.class);
    private static final long READ_TABLET_BUFFER_SIZE = SubscriptionConfig.getInstance().getSubscriptionReadTabletBufferSize();
    private final List<EnrichedEvent> enrichedEvents;
    private final List<Tablet> tablets;
    private long firstEventProcessingTime;
    private long totalBufferSize;

    public SubscriptionPipeTabletEventBatch(int i, SubscriptionPrefetchingTabletQueue subscriptionPrefetchingTabletQueue, int i2, long j) {
        super(i, subscriptionPrefetchingTabletQueue, i2, j);
        this.enrichedEvents = new ArrayList();
        this.tablets = new ArrayList();
        this.firstEventProcessingTime = Long.MIN_VALUE;
        this.totalBufferSize = 0L;
    }

    @Override // org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatch
    public synchronized boolean onEvent(Consumer<SubscriptionEvent> consumer) {
        if (!shouldEmit() || this.enrichedEvents.isEmpty()) {
            return false;
        }
        if (Objects.isNull(this.events)) {
            this.events = generateSubscriptionEvents();
        }
        if (!Objects.nonNull(this.events)) {
            return false;
        }
        this.events.forEach(consumer);
        return true;
    }

    @Override // org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatch
    public synchronized boolean onEvent(EnrichedEvent enrichedEvent, Consumer<SubscriptionEvent> consumer) {
        if (enrichedEvent instanceof TabletInsertionEvent) {
            onEventInternal((TabletInsertionEvent) enrichedEvent);
        }
        return onEvent(consumer);
    }

    @Override // org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatch
    public synchronized void cleanUp() {
        Iterator<EnrichedEvent> it = this.enrichedEvents.iterator();
        while (it.hasNext()) {
            it.next().clearReferenceCount(getClass().getName());
        }
        this.enrichedEvents.clear();
        this.tablets.clear();
    }

    public synchronized void ack() {
        Iterator<EnrichedEvent> it = this.enrichedEvents.iterator();
        while (it.hasNext()) {
            it.next().decreaseReferenceCount(getClass().getName(), true);
        }
    }

    private List<SubscriptionEvent> generateSubscriptionEvents() {
        if (this.tablets.isEmpty()) {
            return null;
        }
        SubscriptionCommitContext generateSubscriptionCommitContext = this.prefetchingQueue.generateSubscriptionCommitContext();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        long j = 0;
        for (Tablet tablet : this.tablets) {
            long calculateTabletSizeInBytes = PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
            if (calculateTabletSizeInBytes > READ_TABLET_BUFFER_SIZE) {
                LOGGER.warn("Detect large tablet with {} byte(s).", Long.valueOf(calculateTabletSizeInBytes));
                arrayList.add(new SubscriptionPollResponse(SubscriptionPollResponseType.TABLETS.getType(), new TabletsPayload(Collections.singletonList(tablet), arrayList.size() + 1), generateSubscriptionCommitContext));
            } else {
                if (j + calculateTabletSizeInBytes > READ_TABLET_BUFFER_SIZE) {
                    arrayList.add(new SubscriptionPollResponse(SubscriptionPollResponseType.TABLETS.getType(), new TabletsPayload(new ArrayList(arrayList2), arrayList.size() + 1), generateSubscriptionCommitContext));
                    arrayList2.clear();
                    j = 0;
                }
                arrayList2.add(tablet);
                j += calculateTabletSizeInBytes;
            }
        }
        arrayList.add(new SubscriptionPollResponse(SubscriptionPollResponseType.TABLETS.getType(), new TabletsPayload(new ArrayList(arrayList2), -this.tablets.size()), generateSubscriptionCommitContext));
        return Collections.singletonList(new SubscriptionEvent(new SubscriptionPipeTabletBatchEvents(this), arrayList));
    }

    private void onEventInternal(TabletInsertionEvent tabletInsertionEvent) {
        constructBatch(tabletInsertionEvent);
        this.enrichedEvents.add((EnrichedEvent) tabletInsertionEvent);
        if (this.firstEventProcessingTime == Long.MIN_VALUE) {
            this.firstEventProcessingTime = System.currentTimeMillis();
        }
    }

    private void constructBatch(TabletInsertionEvent tabletInsertionEvent) {
        List<Tablet> convertToTablets = convertToTablets(tabletInsertionEvent);
        if (convertToTablets.isEmpty()) {
            return;
        }
        this.tablets.addAll(convertToTablets);
        this.totalBufferSize += ((Long) convertToTablets.stream().map(PipeMemoryWeightUtil::calculateTabletSizeInBytes).reduce((v0, v1) -> {
            return Long.sum(v0, v1);
        }).orElse(0L)).longValue();
    }

    private boolean shouldEmit() {
        return this.totalBufferSize >= this.maxBatchSizeInBytes || System.currentTimeMillis() - this.firstEventProcessingTime >= ((long) this.maxDelayInMs);
    }

    private List<Tablet> convertToTablets(TabletInsertionEvent tabletInsertionEvent) {
        if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
            return ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).convertToTablets();
        }
        if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
            return Collections.singletonList(((PipeRawTabletInsertionEvent) tabletInsertionEvent).convertToTablet());
        }
        LOGGER.warn("SubscriptionPipeTabletEventBatch {} only support convert PipeInsertNodeTabletInsertionEvent or PipeRawTabletInsertionEvent to tablet. Ignore {}.", this, tabletInsertionEvent);
        return Collections.emptyList();
    }

    public String toString() {
        return "SubscriptionPipeTabletEventBatch" + coreReportMessage();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatch
    public Map<String, String> coreReportMessage() {
        Map<String, String> coreReportMessage = super.coreReportMessage();
        coreReportMessage.put("enrichedEvents", formatEnrichedEvents(this.enrichedEvents, 4));
        coreReportMessage.put("size of tablets", String.valueOf(this.tablets.size()));
        coreReportMessage.put("firstEventProcessingTime", String.valueOf(this.firstEventProcessingTime));
        coreReportMessage.put("totalBufferSize", String.valueOf(this.totalBufferSize));
        return coreReportMessage;
    }

    private static String formatEnrichedEvents(List<EnrichedEvent> list, int i) {
        List list2 = (List) list.stream().limit(i).map((v0) -> {
            return v0.coreReportMessage();
        }).collect(Collectors.toList());
        if (list2.size() > i) {
            list2.add(String.format("omit the remaining %s event(s)...", Integer.valueOf(list2.size() - i)));
        }
        return list2.toString();
    }

    public int getPipeEventCount() {
        return this.enrichedEvents.size();
    }
}
