package org.apache.iotdb.db.subscription.broker;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.class */
public class SubscriptionPrefetchingTabletQueue extends SubscriptionPrefetchingQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPrefetchingTabletQueue.class);

    /* renamed from: org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTabletQueue$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollResponseType = new int[SubscriptionPollResponseType.values().length];

        static {
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollResponseType[SubscriptionPollResponseType.TABLETS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
        }
    }

    public SubscriptionPrefetchingTabletQueue(String str, String str2, SubscriptionBlockingPendingQueue subscriptionBlockingPendingQueue, AtomicLong atomicLong) {
        super(str, str2, subscriptionBlockingPendingQueue, atomicLong, SubscriptionConfig.getInstance().getSubscriptionPrefetchTabletBatchMaxDelayInMs(), SubscriptionConfig.getInstance().getSubscriptionPrefetchTabletBatchMaxSizeInBytes());
    }

    public SubscriptionEvent pollTablets(String str, SubscriptionCommitContext subscriptionCommitContext, int i) {
        acquireReadLock();
        try {
            return isClosed() ? null : pollTabletsInternal(str, subscriptionCommitContext, i);
        } finally {
            releaseReadLock();
        }
    }

    private SubscriptionEvent pollTabletsInternal(String str, SubscriptionCommitContext subscriptionCommitContext, int i) {
        AtomicReference atomicReference = new AtomicReference();
        this.inFlightEvents.compute(new Pair<>(str, subscriptionCommitContext), (pair, subscriptionEvent) -> {
            if (Objects.isNull(subscriptionEvent)) {
                String format = String.format("SubscriptionPrefetchingTabletQueue %s is currently not transferring any tablet to consumer %s, commit context: %s, offset: %s", this, str, subscriptionCommitContext, Integer.valueOf(i));
                LOGGER.warn(format);
                atomicReference.set(generateSubscriptionPollErrorResponse(format));
                return null;
            }
            if (subscriptionEvent.isCommitted()) {
                subscriptionEvent.cleanUp(false);
                String format2 = String.format("outdated poll request after commit, consumer id: %s, commit context: %s, offset: %s, prefetching queue: %s", str, subscriptionCommitContext, Integer.valueOf(i), this);
                LOGGER.warn(format2);
                atomicReference.set(generateSubscriptionPollErrorResponse(format2));
                return null;
            }
            if (!Objects.equals(subscriptionEvent.getLastPolledConsumerId(), str)) {
                String format3 = String.format("inconsistent polled consumer id, current: %s, incoming: %s, commit context: %s, offset: %s, prefetching queue: %s", subscriptionEvent.getLastPolledConsumerId(), str, subscriptionCommitContext, Integer.valueOf(i), this);
                LOGGER.warn(format3);
                atomicReference.set(generateSubscriptionPollErrorResponse(format3));
                return subscriptionEvent;
            }
            SubscriptionPollResponse currentResponse = subscriptionEvent.getCurrentResponse();
            if (Objects.isNull(currentResponse)) {
                String format4 = String.format("current response is null when fetching next response, consumer id: %s commit context: %s, offset: %s, prefetching queue: %s", str, subscriptionCommitContext, Integer.valueOf(i), this);
                LOGGER.warn(format4);
                atomicReference.set(generateSubscriptionPollErrorResponse(format4));
                return subscriptionEvent;
            }
            TabletsPayload payload = currentResponse.getPayload();
            short responseType = currentResponse.getResponseType();
            if (!SubscriptionPollResponseType.isValidatedResponseType(responseType)) {
                String format5 = String.format("unexpected response type: %s", Short.valueOf(responseType));
                LOGGER.warn(format5);
                atomicReference.set(generateSubscriptionPollErrorResponse(format5));
                return subscriptionEvent;
            }
            switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollResponseType[SubscriptionPollResponseType.valueOf(responseType).ordinal()]) {
                case 1:
                    if (!Objects.equals(Integer.valueOf(i), Integer.valueOf(payload.getNextOffset()))) {
                        String format6 = String.format("inconsistent offset, current: %s, incoming: %s, consumer: %s, prefetching queue: %s", Integer.valueOf(payload.getNextOffset()), Integer.valueOf(i), str, this);
                        LOGGER.warn(format6);
                        atomicReference.set(generateSubscriptionPollErrorResponse(format6));
                        return subscriptionEvent;
                    }
                    try {
                        executeReceiverSubtask(() -> {
                            subscriptionEvent.fetchNextResponse(i);
                            return null;
                        }, SubscriptionAgent.receiver().remainingMs());
                        subscriptionEvent.recordLastPolledTimestamp();
                        atomicReference.set(subscriptionEvent);
                    } catch (Exception e) {
                        String format7 = String.format("exception occurred when fetching next response: %s, consumer id: %s, commit context: %s, offset: %s, prefetching queue: %s", e, str, subscriptionCommitContext, Integer.valueOf(i), this);
                        LOGGER.warn(format7, e);
                        atomicReference.set(generateSubscriptionPollErrorResponse(format7));
                    }
                    return subscriptionEvent;
                default:
                    String format8 = String.format("unexpected response type: %s", Short.valueOf(responseType));
                    LOGGER.warn(format8);
                    atomicReference.set(generateSubscriptionPollErrorResponse(format8));
                    return subscriptionEvent;
            }
        });
        return (SubscriptionEvent) atomicReference.get();
    }

    @Override // org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue
    protected boolean onEvent(TsFileInsertionEvent tsFileInsertionEvent) {
        return this.batches.onEvent((EnrichedEvent) tsFileInsertionEvent, this::prefetchEvent);
    }

    public String toString() {
        return "SubscriptionPrefetchingTabletQueue" + coreReportMessage();
    }
}
