package org.apache.iotdb.db.subscription.broker;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.UnmodifiableIterator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.execution.PipeSubtaskExecutorManager;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches;
import org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
import org.apache.iotdb.db.subscription.task.subtask.SubscriptionReceiverSubtask;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.class */
public abstract class SubscriptionPrefetchingQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPrefetchingQueue.class);
    private final String brokerId;
    private final String topicName;
    private final SubscriptionBlockingPendingQueue inputPendingQueue;
    private final AtomicLong commitIdGenerator;
    protected final SubscriptionPipeEventBatches batches;
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
    private volatile boolean isCompleted = false;
    private volatile boolean isClosed = false;
    private final RemappingFunction<SubscriptionEvent> committedCleaner = subscriptionEvent -> {
        if (!subscriptionEvent.isCommitted()) {
            return subscriptionEvent;
        }
        subscriptionEvent.cleanUp(false);
        return null;
    };
    private final RemappingFunction<SubscriptionEvent> pollableNacker = subscriptionEvent -> {
        if (subscriptionEvent.eagerlyPollable()) {
            subscriptionEvent.nack();
            prefetchEvent(subscriptionEvent);
            return null;
        }
        if (!subscriptionEvent.pollable()) {
            return subscriptionEvent;
        }
        subscriptionEvent.nack();
        prefetchEvent(subscriptionEvent);
        LOGGER.warn("Subscription: SubscriptionPrefetchingQueue {} recycle event {} from in flight events, nack and enqueue it to prefetching queue", this, subscriptionEvent);
        return null;
    };
    private final RemappingFunction<SubscriptionEvent> responsePrefetcher = subscriptionEvent -> {
        try {
            subscriptionEvent.prefetchRemainingResponses();
        } catch (Exception e) {
        }
        return subscriptionEvent;
    };
    private final RemappingFunction<SubscriptionEvent> responseSerializer = subscriptionEvent -> {
        try {
            subscriptionEvent.trySerializeCurrentResponse();
            subscriptionEvent.trySerializeRemainingResponses();
        } catch (Exception e) {
        }
        return subscriptionEvent;
    };
    protected final PriorityBlockingQueue<SubscriptionEvent> prefetchingQueue = new PriorityBlockingQueue<>();
    protected final Map<Pair<String, SubscriptionCommitContext>, SubscriptionEvent> inFlightEvents = new ConcurrentHashMap();
    private final SubscriptionPrefetchingQueueStates states = new SubscriptionPrefetchingQueueStates(this);

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue$RemappingFunction.class */
    public interface RemappingFunction<V> {
        V remap(V v);
    }

    public SubscriptionPrefetchingQueue(String str, String str2, SubscriptionBlockingPendingQueue subscriptionBlockingPendingQueue, AtomicLong atomicLong, int i, long j) {
        this.brokerId = str;
        this.topicName = str2;
        this.inputPendingQueue = subscriptionBlockingPendingQueue;
        this.commitIdGenerator = atomicLong;
        this.batches = new SubscriptionPipeEventBatches(this, i, j);
    }

    public void cleanUp() {
        acquireWriteLock();
        try {
            cleanUpInternal();
        } finally {
            releaseWriteLock();
        }
    }

    protected void cleanUpInternal() {
        this.batches.cleanUp();
        this.prefetchingQueue.forEach(subscriptionEvent -> {
            subscriptionEvent.cleanUp(true);
        });
        this.prefetchingQueue.clear();
        this.inFlightEvents.values().forEach(subscriptionEvent2 -> {
            subscriptionEvent2.cleanUp(true);
        });
        this.inFlightEvents.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void acquireReadLock() {
        this.lock.readLock().lock();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void releaseReadLock() {
        this.lock.readLock().unlock();
    }

    protected void acquireWriteLock() {
        this.lock.writeLock().lock();
    }

    protected void releaseWriteLock() {
        this.lock.writeLock().unlock();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void executeReceiverSubtask(SubscriptionReceiverSubtask subscriptionReceiverSubtask, long j) throws Exception {
        PipeSubtaskExecutorManager.getInstance().getSubscriptionExecutor().executeReceiverSubtask(subscriptionReceiverSubtask, j);
    }

    public SubscriptionEvent poll(String str) {
        acquireReadLock();
        try {
            return isClosed() ? null : pollInternal(str);
        } finally {
            releaseReadLock();
        }
    }

    public SubscriptionEvent pollInternal(String str) {
        this.states.markPollRequest();
        if (this.prefetchingQueue.isEmpty()) {
            this.states.markMissingPrefetch();
            try {
                executeReceiverSubtask(() -> {
                    tryPrefetch();
                    return null;
                }, SubscriptionAgent.receiver().remainingMs());
            } catch (Exception e) {
                LOGGER.warn("Exception {} occurred when {} execute receiver subtask", new Object[]{this, e, e});
            }
        }
        if (this.prefetchingQueue.isEmpty()) {
            onEvent();
        }
        long size = this.prefetchingQueue.size();
        long j = 0;
        while (true) {
            try {
                long j2 = j;
                j = j2 + 1;
                if (j2 >= size) {
                    return null;
                }
                SubscriptionEvent poll = this.prefetchingQueue.poll(SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(), TimeUnit.MILLISECONDS);
                if (!Objects.nonNull(poll)) {
                    return null;
                }
                if (poll.isCommitted()) {
                    LOGGER.warn("Subscription: SubscriptionPrefetchingQueue {} poll committed event {} from prefetching queue (broken invariant), remove it", this, poll);
                } else {
                    if (poll.pollable()) {
                        poll.recordLastPolledTimestamp();
                        this.inFlightEvents.put(new Pair<>(str, poll.getCommitContext()), poll);
                        poll.recordLastPolledConsumerId(str);
                        return poll;
                    }
                    LOGGER.warn("Subscription: SubscriptionPrefetchingQueue {} poll non-pollable event {} from prefetching queue (broken invariant), nack and remove it", this, poll);
                    poll.nack();
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                LOGGER.warn("Subscription: SubscriptionPrefetchingQueue {} interrupted while polling events.", this, e2);
                return null;
            }
        }
    }

    public boolean executePrefetch() {
        acquireReadLock();
        try {
            if (isClosed()) {
                return false;
            }
            reportStateIfNeeded();
            if (!this.states.shouldPrefetch()) {
                remapInFlightEventsSnapshot(this.committedCleaner, this.pollableNacker);
                return false;
            }
            tryPrefetch();
            remapInFlightEventsSnapshot(this.committedCleaner, this.pollableNacker, this.responsePrefetcher, this.responseSerializer);
            return true;
        } finally {
            releaseReadLock();
        }
    }

    private void reportStateIfNeeded() {
        SubscriptionDataNodeResourceManager.log().schedule(SubscriptionPrefetchingQueue.class, this.brokerId, this.topicName).ifPresent(logger -> {
            logger.info("Subscription: SubscriptionPrefetchingQueue state {}", this);
        });
    }

    @SafeVarargs
    private final void remapInFlightEventsSnapshot(RemappingFunction<SubscriptionEvent>... remappingFunctionArr) {
        UnmodifiableIterator it = ImmutableSet.copyOf(this.inFlightEvents.keySet()).iterator();
        while (it.hasNext()) {
            this.inFlightEvents.compute((Pair) it.next(), (pair, subscriptionEvent) -> {
                return COMBINER(remappingFunctionArr).remap(subscriptionEvent);
            });
        }
    }

    public void prefetchEvent(SubscriptionEvent subscriptionEvent) {
        SubscriptionEvent peek = this.prefetchingQueue.peek();
        if (Objects.nonNull(peek) && subscriptionEvent.compareTo(peek) < 0) {
            this.states.markDisorderCause();
        }
        this.prefetchingQueue.add(subscriptionEvent);
    }

    private void tryPrefetch() {
        while (!this.inputPendingQueue.isEmpty()) {
            PipeTerminateEvent maybeOf = UserDefinedEnrichedEvent.maybeOf(this.inputPendingQueue.waitedPoll());
            if (!Objects.isNull(maybeOf)) {
                if (!(maybeOf instanceof EnrichedEvent)) {
                    LOGGER.warn("Subscription: SubscriptionPrefetchingQueue {} only support prefetch EnrichedEvent. Ignore {}.", this, maybeOf);
                } else if (maybeOf instanceof PipeTerminateEvent) {
                    PipeTerminateEvent pipeTerminateEvent = maybeOf;
                    pipeTerminateEvent.addOnCommittedHook(() -> {
                        markCompleted();
                        return null;
                    });
                    maybeOf.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
                    LOGGER.info("Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent {}", this, pipeTerminateEvent);
                } else if (maybeOf instanceof TabletInsertionEvent) {
                    if (onEvent((TabletInsertionEvent) maybeOf)) {
                        return;
                    }
                } else if (!(maybeOf instanceof TsFileInsertionEvent)) {
                    LOGGER.info("Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent {} when prefetching.", this, maybeOf);
                    maybeOf.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), false);
                    if (onEvent()) {
                        return;
                    }
                } else if (onEvent((TsFileInsertionEvent) maybeOf)) {
                    return;
                }
            }
        }
    }

    protected abstract boolean onEvent(TsFileInsertionEvent tsFileInsertionEvent);

    protected boolean onEvent(TabletInsertionEvent tabletInsertionEvent) {
        return this.batches.onEvent((EnrichedEvent) tabletInsertionEvent, this::prefetchEvent);
    }

    protected boolean onEvent() {
        return this.batches.onEvent(this::prefetchEvent);
    }

    public boolean ack(String str, SubscriptionCommitContext subscriptionCommitContext) {
        boolean z;
        acquireReadLock();
        try {
            if (!isClosed()) {
                if (ackInternal(str, subscriptionCommitContext)) {
                    z = true;
                    return z;
                }
            }
            z = false;
            return z;
        } finally {
            releaseReadLock();
        }
    }

    private boolean ackInternal(String str, SubscriptionCommitContext subscriptionCommitContext) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.inFlightEvents.compute(new Pair<>(str, subscriptionCommitContext), (pair, subscriptionEvent) -> {
            if (Objects.isNull(subscriptionEvent)) {
                LOGGER.warn("Subscription: subscription commit context {} does not exist, it may have been committed or something unexpected happened, prefetching queue: {}", subscriptionCommitContext, this);
                return null;
            }
            if (subscriptionEvent.isCommitted()) {
                LOGGER.warn("Subscription: subscription event {} is committed, subscription commit context {}, prefetching queue: {}", new Object[]{subscriptionEvent, subscriptionCommitContext, this});
                subscriptionEvent.cleanUp(false);
                return null;
            }
            if (!subscriptionEvent.isCommittable()) {
                LOGGER.warn("Subscription: subscription event {} is not committable, subscription commit context {}, prefetching queue: {}", new Object[]{subscriptionEvent, subscriptionCommitContext, this});
                return subscriptionEvent;
            }
            String consumerGroupId = subscriptionCommitContext.getConsumerGroupId();
            if (!Objects.equals(consumerGroupId, this.brokerId)) {
                LOGGER.warn("inconsistent consumer group when acking event, current: {}, incoming: {}, consumer id: {}, event commit context: {}, prefetching queue: {}, commit it anyway...", new Object[]{this.brokerId, consumerGroupId, str, subscriptionCommitContext, this});
            }
            subscriptionEvent.ack();
            subscriptionEvent.recordCommittedTimestamp();
            atomicBoolean.set(true);
            subscriptionEvent.cleanUp(false);
            return null;
        });
        return atomicBoolean.get();
    }

    public boolean nack(String str, SubscriptionCommitContext subscriptionCommitContext) {
        boolean z;
        acquireReadLock();
        try {
            if (!isClosed()) {
                if (nackInternal(str, subscriptionCommitContext)) {
                    z = true;
                    return z;
                }
            }
            z = false;
            return z;
        } finally {
            releaseReadLock();
        }
    }

    public boolean nackInternal(String str, SubscriptionCommitContext subscriptionCommitContext) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.inFlightEvents.compute(new Pair<>(str, subscriptionCommitContext), (pair, subscriptionEvent) -> {
            if (Objects.isNull(subscriptionEvent)) {
                LOGGER.warn("Subscription: subscription commit context [{}] does not exist, it may have been committed or something unexpected happened, prefetching queue: {}", subscriptionCommitContext, this);
                return null;
            }
            String consumerGroupId = subscriptionCommitContext.getConsumerGroupId();
            if (!Objects.equals(consumerGroupId, this.brokerId)) {
                LOGGER.warn("inconsistent consumer group when nacking event, current: {}, incoming: {}, consumer id: {}, event commit context: {}, prefetching queue: {}, commit it anyway...", new Object[]{this.brokerId, consumerGroupId, str, subscriptionCommitContext, this});
            }
            subscriptionEvent.nack();
            atomicBoolean.set(true);
            return subscriptionEvent;
        });
        return atomicBoolean.get();
    }

    public SubscriptionCommitContext generateSubscriptionCommitContext() {
        return new SubscriptionCommitContext(IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), PipeDataNodeAgent.runtime().getRebootTimes(), this.topicName, this.brokerId, this.commitIdGenerator.getAndIncrement());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SubscriptionEvent generateSubscriptionPollErrorResponse(String str) {
        return new SubscriptionEvent(SubscriptionPollResponseType.ERROR.getType(), (SubscriptionPollPayload) new ErrorPayload(str, false), new SubscriptionCommitContext(IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), PipeDataNodeAgent.runtime().getRebootTimes(), this.topicName, this.brokerId, -1L));
    }

    public String getPrefetchingQueueId() {
        return generatePrefetchingQueueId(this.brokerId, this.topicName);
    }

    public static String generatePrefetchingQueueId(String str, String str2) {
        return str + "_" + str2;
    }

    public long getSubscriptionUncommittedEventCount() {
        return this.inFlightEvents.size();
    }

    public long getCurrentCommitId() {
        return this.commitIdGenerator.get();
    }

    public int getPipeEventCount() {
        return this.inputPendingQueue.size() + ((Integer) this.prefetchingQueue.stream().map((v0) -> {
            return v0.getPipeEventCount();
        }).reduce((v0, v1) -> {
            return Integer.sum(v0, v1);
        }).orElse(0)).intValue() + ((Integer) this.inFlightEvents.values().stream().map((v0) -> {
            return v0.getPipeEventCount();
        }).reduce((v0, v1) -> {
            return Integer.sum(v0, v1);
        }).orElse(0)).intValue();
    }

    public int getPrefetchedEventCount() {
        return this.prefetchingQueue.size();
    }

    public boolean isClosed() {
        return this.isClosed;
    }

    public void markClosed() {
        this.isClosed = true;
    }

    public boolean isCompleted() {
        return this.isCompleted;
    }

    public void markCompleted() {
        this.isCompleted = true;
    }

    public Map<String, String> coreReportMessage() {
        HashMap hashMap = new HashMap();
        hashMap.put("brokerId", this.brokerId);
        hashMap.put("topicName", this.topicName);
        hashMap.put("size of inputPendingQueue", String.valueOf(this.inputPendingQueue.size()));
        hashMap.put("size of prefetchingQueue", String.valueOf(this.prefetchingQueue.size()));
        hashMap.put("size of inFlightEvents", String.valueOf(this.inFlightEvents.size()));
        hashMap.put("commitIdGenerator", this.commitIdGenerator.toString());
        hashMap.put("states", this.states.toString());
        hashMap.put("isCompleted", String.valueOf(this.isCompleted));
        hashMap.put("isClosed", String.valueOf(this.isClosed));
        return hashMap;
    }

    public Map<String, String> allReportMessage() {
        HashMap hashMap = new HashMap();
        hashMap.put("brokerId", this.brokerId);
        hashMap.put("topicName", this.topicName);
        hashMap.put("size of inputPendingQueue", String.valueOf(this.inputPendingQueue.size()));
        hashMap.put("prefetchingQueue", this.prefetchingQueue.toString());
        hashMap.put("inFlightEvents", this.inFlightEvents.toString());
        hashMap.put("commitIdGenerator", this.commitIdGenerator.toString());
        hashMap.put("states", this.states.toString());
        hashMap.put("isCompleted", String.valueOf(this.isCompleted));
        hashMap.put("isClosed", String.valueOf(this.isClosed));
        return hashMap;
    }

    @SafeVarargs
    private static RemappingFunction<SubscriptionEvent> COMBINER(RemappingFunction<SubscriptionEvent>... remappingFunctionArr) {
        return subscriptionEvent -> {
            if (Objects.isNull(subscriptionEvent)) {
                return null;
            }
            for (RemappingFunction remappingFunction : remappingFunctionArr) {
                if (Objects.isNull(remappingFunction.remap(subscriptionEvent))) {
                    return null;
                }
            }
            return subscriptionEvent;
        };
    }
}
