package org.apache.iotdb.db.subscription.broker;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFilePlainEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.FilePiecePayload;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.class */
public class SubscriptionPrefetchingTsFileQueue extends SubscriptionPrefetchingQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPrefetchingTsFileQueue.class);

    /* renamed from: org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollResponseType = new int[SubscriptionPollResponseType.values().length];

        static {
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollResponseType[SubscriptionPollResponseType.FILE_INIT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollResponseType[SubscriptionPollResponseType.FILE_PIECE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollResponseType[SubscriptionPollResponseType.FILE_SEAL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public SubscriptionPrefetchingTsFileQueue(String str, String str2, SubscriptionBlockingPendingQueue subscriptionBlockingPendingQueue, AtomicLong atomicLong) {
        super(str, str2, subscriptionBlockingPendingQueue, atomicLong, SubscriptionConfig.getInstance().getSubscriptionPrefetchTsFileBatchMaxDelayInMs(), SubscriptionConfig.getInstance().getSubscriptionPrefetchTsFileBatchMaxSizeInBytes());
    }

    public SubscriptionEvent pollTsFile(String str, SubscriptionCommitContext subscriptionCommitContext, long j) {
        acquireReadLock();
        try {
            return isClosed() ? null : pollTsFileInternal(str, subscriptionCommitContext, j);
        } finally {
            releaseReadLock();
        }
    }

    public SubscriptionEvent pollTsFileInternal(String str, SubscriptionCommitContext subscriptionCommitContext, long j) {
        AtomicReference atomicReference = new AtomicReference();
        this.inFlightEvents.compute(new Pair<>(str, subscriptionCommitContext), (pair, subscriptionEvent) -> {
            if (Objects.isNull(subscriptionEvent)) {
                String format = String.format("SubscriptionPrefetchingTsFileQueue %s is currently not transferring any file to consumer %s, commit context: %s, writing offset: %s", this, str, subscriptionCommitContext, Long.valueOf(j));
                LOGGER.warn(format);
                atomicReference.set(generateSubscriptionPollErrorResponse(format));
                return null;
            }
            if (subscriptionEvent.isCommitted()) {
                subscriptionEvent.cleanUp();
                String format2 = String.format("outdated poll request after commit, consumer id: %s, commit context: %s, writing offset: %s, prefetching queue: %s", str, subscriptionCommitContext, Long.valueOf(j), this);
                LOGGER.warn(format2);
                atomicReference.set(generateSubscriptionPollErrorResponse(format2));
                return null;
            }
            if (!Objects.equals(subscriptionEvent.getLastPolledConsumerId(), str)) {
                String format3 = String.format("inconsistent polled consumer id, current: %s, incoming: %s, commit context: %s, writing offset: %s, prefetching queue: %s", subscriptionEvent.getLastPolledConsumerId(), str, subscriptionCommitContext, Long.valueOf(j), this);
                LOGGER.warn(format3);
                atomicReference.set(generateSubscriptionPollErrorResponse(format3));
                return subscriptionEvent;
            }
            SubscriptionPollResponse currentResponse = subscriptionEvent.getCurrentResponse();
            FileInitPayload payload = currentResponse.getPayload();
            short responseType = currentResponse.getResponseType();
            if (!SubscriptionPollResponseType.isValidatedResponseType(responseType)) {
                String format4 = String.format("unexpected response type: %s", Short.valueOf(responseType));
                LOGGER.warn(format4);
                atomicReference.set(generateSubscriptionPollErrorResponse(format4));
                return subscriptionEvent;
            }
            String fileName = subscriptionEvent.getFileName();
            switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollResponseType[SubscriptionPollResponseType.valueOf(responseType).ordinal()]) {
                case 1:
                    if (!Objects.equals(fileName, payload.getFileName())) {
                        String format5 = String.format("inconsistent file name, current: %s, incoming: %s, consumer: %s, writing offset: %s, prefetching queue: %s", payload.getFileName(), fileName, str, Long.valueOf(j), this);
                        LOGGER.warn(format5);
                        atomicReference.set(generateSubscriptionPollErrorResponse(format5));
                        return subscriptionEvent;
                    }
                    if (j != 0) {
                        String format6 = String.format("inconsistent offset, current: %s, incoming: %s, consumer: %s, file name: %s, prefetching queue: %s", 0, Long.valueOf(j), str, fileName, this);
                        LOGGER.warn(format6);
                        atomicReference.set(generateSubscriptionPollErrorResponse(format6));
                        return subscriptionEvent;
                    }
                    break;
                case 2:
                    if (!Objects.equals(fileName, ((FilePiecePayload) payload).getFileName())) {
                        String format7 = String.format("inconsistent file name, current: %s, incoming: %s, consumer: %s, writing offset: %s, prefetching queue: %s", ((FilePiecePayload) payload).getFileName(), fileName, str, Long.valueOf(j), this);
                        LOGGER.warn(format7);
                        atomicReference.set(generateSubscriptionPollErrorResponse(format7));
                        return subscriptionEvent;
                    }
                    if (j != ((FilePiecePayload) payload).getNextWritingOffset()) {
                        String format8 = String.format("inconsistent offset, current: %s, incoming: %s, consumer: %s, file name: %s, prefetching queue: %s", Long.valueOf(((FilePiecePayload) payload).getNextWritingOffset()), Long.valueOf(j), str, fileName, this);
                        LOGGER.warn(format8);
                        atomicReference.set(generateSubscriptionPollErrorResponse(format8));
                        return subscriptionEvent;
                    }
                    break;
                case 3:
                    String format9 = String.format("poll after sealing, consumer: %s, file name: %s, writing offset: %s, prefetching queue: %s", str, fileName, Long.valueOf(j), this);
                    LOGGER.warn(format9);
                    atomicReference.set(generateSubscriptionPollErrorResponse(format9));
                    return subscriptionEvent;
                default:
                    String format10 = String.format("unexpected response type: %s", Short.valueOf(responseType));
                    LOGGER.warn(format10);
                    atomicReference.set(generateSubscriptionPollErrorResponse(format10));
                    return subscriptionEvent;
            }
            try {
                subscriptionEvent.fetchNextResponse();
                subscriptionEvent.recordLastPolledTimestamp();
                atomicReference.set(subscriptionEvent);
                return subscriptionEvent;
            } catch (Exception e) {
                LOGGER.warn("Exception occurred when SubscriptionPrefetchingTsFileQueue {} transferring file (with event {}) to consumer {}", new Object[]{this, subscriptionEvent, str, e});
                String format11 = String.format("Exception occurred when SubscriptionPrefetchingTsFileQueue %s transferring file (with event %s) to consumer %s: %s", this, subscriptionEvent, str, e);
                LOGGER.warn(format11);
                atomicReference.set(generateSubscriptionPollErrorResponse(format11));
                return subscriptionEvent;
            }
        });
        return (SubscriptionEvent) atomicReference.get();
    }

    @Override // org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue
    protected boolean onEvent(TsFileInsertionEvent tsFileInsertionEvent) {
        super.enqueueEventToPrefetchingQueue(new SubscriptionEvent(new SubscriptionPipeTsFilePlainEvent((PipeTsFileInsertionEvent) tsFileInsertionEvent), new SubscriptionPollResponse(SubscriptionPollResponseType.FILE_INIT.getType(), new FileInitPayload(((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile().getName()), generateSubscriptionCommitContext())));
        return true;
    }

    public String toString() {
        return "SubscriptionPrefetchingTsFileQueue" + coreReportMessage();
    }
}
