package org.apache.iotdb.db.subscription.event.response;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
import org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.class */
public class SubscriptionEventTabletResponse extends SubscriptionEventExtendableResponse {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionEventTabletResponse.class);
    private static final long READ_TABLET_BUFFER_SIZE = SubscriptionConfig.getInstance().getSubscriptionReadTabletBufferSize();
    private static final long PREFETCH_TABLET_BUFFER_SIZE = SubscriptionConfig.getInstance().getSubscriptionPrefetchTabletBatchMaxSizeInBytes();
    private final SubscriptionPipeTabletEventBatch batch;
    private final SubscriptionPrefetchingQueue queue;
    private final SubscriptionPipeTabletBatchEvents events;
    private final SubscriptionCommitContext commitContext;
    private final SubscriptionCommitContext rootCommitContext;
    private volatile int totalTablets;
    private volatile long totalBufferSize;
    private final AtomicInteger nextOffset = new AtomicInteger(0);
    private volatile boolean availableForNext = false;

    public SubscriptionEventTabletResponse(SubscriptionPipeTabletEventBatch subscriptionPipeTabletEventBatch, SubscriptionPrefetchingQueue subscriptionPrefetchingQueue, SubscriptionPipeTabletBatchEvents subscriptionPipeTabletBatchEvents, SubscriptionCommitContext subscriptionCommitContext, SubscriptionCommitContext subscriptionCommitContext2) {
        this.batch = subscriptionPipeTabletEventBatch;
        this.queue = subscriptionPrefetchingQueue;
        this.events = subscriptionPipeTabletBatchEvents;
        this.commitContext = subscriptionCommitContext;
        this.rootCommitContext = subscriptionCommitContext2;
        init();
    }

    @Override // org.apache.iotdb.db.subscription.event.response.SubscriptionEventResponse
    public void prefetchRemainingResponses() {
    }

    @Override // org.apache.iotdb.db.subscription.event.response.SubscriptionEventResponse
    public void fetchNextResponse(long j) throws Exception {
        offer(generateNextTabletResponse());
        CachedSubscriptionPollResponse poll = poll();
        if (Objects.isNull(poll)) {
            LOGGER.warn("SubscriptionEventTabletResponse {} is empty when fetching next response (broken invariant)", this);
        } else {
            poll.closeMemoryBlock();
        }
    }

    @Override // org.apache.iotdb.db.subscription.event.response.SubscriptionEventResponse
    public synchronized void nack() {
        cleanUp();
        this.batch.resetForIteration();
        init();
    }

    @Override // org.apache.iotdb.db.subscription.event.response.SubscriptionEventExtendableResponse, org.apache.iotdb.db.subscription.event.response.SubscriptionEventResponse
    public synchronized void cleanUp() {
        super.cleanUp();
        this.totalTablets = 0;
        this.nextOffset.set(0);
        this.totalBufferSize = 0L;
        this.availableForNext = false;
    }

    @Override // org.apache.iotdb.db.subscription.event.response.SubscriptionEventExtendableResponse, org.apache.iotdb.db.subscription.event.response.SubscriptionEventResponse
    public boolean isCommittable() {
        return (this.availableForNext || this.hasNoMore) && size() == 1;
    }

    private void init() {
        if (isEmpty()) {
            offer(generateEmptyTabletResponse());
        } else {
            LOGGER.warn("SubscriptionEventTabletResponse {} is not empty when initializing (broken invariant)", this);
        }
    }

    private synchronized CachedSubscriptionPollResponse generateEmptyTabletResponse() {
        return new CachedSubscriptionPollResponse(SubscriptionPollResponseType.TABLETS.getType(), new TabletsPayload(Collections.emptyList(), this.nextOffset.incrementAndGet()), this.commitContext);
    }

    private synchronized CachedSubscriptionPollResponse generateNextTabletResponse() throws InterruptedException, PipeRuntimeOutOfMemoryCriticalException {
        if (this.availableForNext) {
            this.queue.prefetchEvent(new SubscriptionEvent(this.batch, this.queue, this.rootCommitContext));
            transportIterationSnapshot();
            return new CachedSubscriptionPollResponse(SubscriptionPollResponseType.TABLETS.getType(), new TabletsPayload(Collections.emptyList(), -this.totalTablets), this.commitContext);
        }
        CachedSubscriptionPollResponse cachedSubscriptionPollResponse = null;
        ArrayList arrayList = new ArrayList();
        long j = 0;
        while (true) {
            if (!this.batch.hasNext()) {
                break;
            }
            List<Tablet> next = this.batch.next();
            if (!Objects.isNull(next)) {
                arrayList.addAll(next);
                long longValue = ((Long) next.stream().map(PipeMemoryWeightUtil::calculateTabletSizeInBytes).reduce((v0, v1) -> {
                    return Long.sum(v0, v1);
                }).orElse(0L)).longValue();
                this.totalTablets += next.size();
                this.totalBufferSize += longValue;
                j += longValue;
                if (longValue <= READ_TABLET_BUFFER_SIZE) {
                    if (j <= READ_TABLET_BUFFER_SIZE) {
                        if (this.totalBufferSize > PREFETCH_TABLET_BUFFER_SIZE && this.batch.hasNext()) {
                            this.availableForNext = true;
                            break;
                        }
                    } else {
                        cachedSubscriptionPollResponse = new CachedSubscriptionPollResponse(SubscriptionPollResponseType.TABLETS.getType(), new TabletsPayload(new ArrayList(arrayList), this.nextOffset.incrementAndGet()), this.commitContext);
                        break;
                    }
                } else {
                    LOGGER.warn("Detect large tablets with {} byte(s), current tablets size {} byte(s)", Long.valueOf(longValue), arrayList);
                    cachedSubscriptionPollResponse = new CachedSubscriptionPollResponse(SubscriptionPollResponseType.TABLETS.getType(), new TabletsPayload(new ArrayList(arrayList), this.nextOffset.incrementAndGet()), this.commitContext);
                    break;
                }
            }
        }
        if (Objects.isNull(cachedSubscriptionPollResponse)) {
            if (arrayList.isEmpty()) {
                transportIterationSnapshot();
                cachedSubscriptionPollResponse = new CachedSubscriptionPollResponse(SubscriptionPollResponseType.TABLETS.getType(), new TabletsPayload(Collections.emptyList(), -this.totalTablets), this.commitContext);
                this.hasNoMore = true;
            } else {
                cachedSubscriptionPollResponse = new CachedSubscriptionPollResponse(SubscriptionPollResponseType.TABLETS.getType(), new TabletsPayload(new ArrayList(arrayList), this.nextOffset.incrementAndGet()), this.commitContext);
            }
        }
        List tablets = cachedSubscriptionPollResponse.getPayload().getTablets();
        if (Objects.nonNull(tablets) && !tablets.isEmpty()) {
            cachedSubscriptionPollResponse.setMemoryBlock(PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(j));
        }
        return cachedSubscriptionPollResponse;
    }

    private void waitForResourceEnough4Parsing(long j) throws InterruptedException {
        PipeMemoryManager memory = PipeDataNodeResourceManager.memory();
        if (memory.isEnough4TabletParsing()) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        long subscriptionCheckMemoryEnoughIntervalMs = SubscriptionConfig.getInstance().getSubscriptionCheckMemoryEnoughIntervalMs();
        while (!memory.isEnough4TabletParsing()) {
            Thread.sleep(subscriptionCheckMemoryEnoughIntervalMs);
            long currentTimeMillis2 = System.currentTimeMillis();
            double d = (currentTimeMillis2 - currentTimeMillis) / 1000.0d;
            double d2 = (currentTimeMillis2 - r0) / 1000.0d;
            if (d > 10.0d) {
                LOGGER.info("SubscriptionEventTabletResponse {} wait for resource enough for parsing tablets {} seconds.", this.commitContext, Double.valueOf(d2));
                currentTimeMillis = currentTimeMillis2;
            } else if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("SubscriptionEventTabletResponse {} wait for resource enough for parsing tablets {} seconds.", this.commitContext, Double.valueOf(d2));
            }
            if (d2 * 1000.0d > j) {
                throw new PipeException(String.format("TimeoutException: Waited %s seconds", Double.valueOf(d2)));
            }
        }
        LOGGER.info("SubscriptionEventTabletResponse {} wait for resource enough for parsing tablets {} seconds.", this.commitContext, Double.valueOf((System.currentTimeMillis() - r0) / 1000.0d));
    }

    private void transportIterationSnapshot() {
        this.events.receiveIterationSnapshot(this.batch.sendIterationSnapshot());
    }

    @Override // org.apache.iotdb.db.subscription.event.response.SubscriptionEventExtendableResponse
    public String toString() {
        return "SubscriptionEventTabletResponse" + coreReportMessage();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.db.subscription.event.response.SubscriptionEventExtendableResponse
    public Map<String, String> coreReportMessage() {
        Map<String, String> coreReportMessage = super.coreReportMessage();
        coreReportMessage.put("totalTablets", String.valueOf(this.totalTablets));
        coreReportMessage.put("nextOffset", String.valueOf(this.nextOffset));
        coreReportMessage.put("totalBufferSize", String.valueOf(this.totalBufferSize));
        coreReportMessage.put("availableForNext", String.valueOf(this.availableForNext));
        return coreReportMessage;
    }
}
