package org.apache.iotdb.db.pipe.agent.task.subtask.connector;

import java.util.Objects;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.iotdb.commons.pipe.agent.task.connection.BlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;

/* loaded from: input_file:org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.class */
public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQueue<Event> {
    private final BlockingDeque<TsFileInsertionEvent> tsfileInsertEventDeque;
    private final AtomicInteger pollTsFileCounter;
    private final AtomicLong pollHistoricalTsFileCounter;
    private static final int POLL_TSFILE_THRESHOLD = PipeConfig.getInstance().getPipeRealTimeQueuePollTsFileThreshold();
    private static final int POLL_HISTORICAL_TSFILE_THRESHOLD = Math.max(PipeConfig.getInstance().getPipeRealTimeQueuePollHistoricalTsFileThreshold(), 1);

    public PipeRealtimePriorityBlockingQueue() {
        super(new PipeDataRegionEventCounter());
        this.tsfileInsertEventDeque = new LinkedBlockingDeque();
        this.pollTsFileCounter = new AtomicInteger(0);
        this.pollHistoricalTsFileCounter = new AtomicLong(0L);
    }

    public boolean directOffer(Event event) {
        checkBeforeOffer(event);
        if (event instanceof TsFileInsertionEvent) {
            this.tsfileInsertEventDeque.add((TsFileInsertionEvent) event);
            return true;
        }
        if (!(event instanceof PipeHeartbeatEvent) || !(super.peekLast() instanceof PipeHeartbeatEvent)) {
            return super.directOffer(event);
        }
        ((EnrichedEvent) event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
        return false;
    }

    public boolean waitedOffer(Event event) {
        return directOffer(event);
    }

    public boolean put(Event event) {
        directOffer(event);
        return true;
    }

    public Event directPoll() {
        Event event = null;
        if (this.pollTsFileCounter.get() >= POLL_TSFILE_THRESHOLD) {
            event = this.pollHistoricalTsFileCounter.incrementAndGet() % ((long) POLL_HISTORICAL_TSFILE_THRESHOLD) == 0 ? (Event) this.tsfileInsertEventDeque.pollFirst() : (Event) this.tsfileInsertEventDeque.pollLast();
            this.pollTsFileCounter.set(0);
        }
        if (Objects.isNull(event)) {
            event = super.directPoll();
            if (Objects.isNull(event)) {
                event = this.pollHistoricalTsFileCounter.incrementAndGet() % ((long) POLL_HISTORICAL_TSFILE_THRESHOLD) == 0 ? (Event) this.tsfileInsertEventDeque.pollFirst() : (Event) this.tsfileInsertEventDeque.pollLast();
            }
            if (event != null) {
                this.pollTsFileCounter.incrementAndGet();
            }
        }
        return event;
    }

    public Event waitedPoll() {
        Event event = null;
        if (this.pollTsFileCounter.get() >= POLL_TSFILE_THRESHOLD) {
            event = this.pollHistoricalTsFileCounter.incrementAndGet() % ((long) POLL_HISTORICAL_TSFILE_THRESHOLD) == 0 ? (Event) this.tsfileInsertEventDeque.pollFirst() : (Event) this.tsfileInsertEventDeque.pollLast();
            this.pollTsFileCounter.set(0);
        }
        if (event == null) {
            event = super.directPoll();
            if (event == null && !this.tsfileInsertEventDeque.isEmpty()) {
                event = this.pollHistoricalTsFileCounter.incrementAndGet() % ((long) POLL_HISTORICAL_TSFILE_THRESHOLD) == 0 ? (Event) this.tsfileInsertEventDeque.pollFirst() : (Event) this.tsfileInsertEventDeque.pollLast();
            }
            if (event != null) {
                this.pollTsFileCounter.incrementAndGet();
            }
        }
        if (Objects.isNull(event)) {
            event = super.waitedPoll();
            if (Objects.isNull(event)) {
                event = this.pollHistoricalTsFileCounter.incrementAndGet() % ((long) POLL_HISTORICAL_TSFILE_THRESHOLD) == 0 ? (Event) this.tsfileInsertEventDeque.pollFirst() : (Event) this.tsfileInsertEventDeque.pollLast();
            }
            if (event != null) {
                this.pollTsFileCounter.incrementAndGet();
            }
        }
        return event;
    }

    public Event peek() {
        Event event = (Event) this.pendingQueue.peek();
        return Objects.nonNull(event) ? event : this.tsfileInsertEventDeque.peek();
    }

    public void clear() {
        super.clear();
        this.tsfileInsertEventDeque.clear();
    }

    public void forEach(Consumer<? super Event> consumer) {
        super.forEach(consumer);
        this.tsfileInsertEventDeque.forEach(consumer);
    }

    public void discardAllEvents() {
        super.discardAllEvents();
        this.tsfileInsertEventDeque.removeIf(tsFileInsertionEvent -> {
            if (!(tsFileInsertionEvent instanceof EnrichedEvent) || !((EnrichedEvent) tsFileInsertionEvent).clearReferenceCount(BlockingPendingQueue.class.getName())) {
                return true;
            }
            this.eventCounter.decreaseEventCount(tsFileInsertionEvent);
            return true;
        });
        this.eventCounter.reset();
    }

    public void discardEventsOfPipe(String str, int i) {
        super.discardEventsOfPipe(str, i);
        this.tsfileInsertEventDeque.removeIf(tsFileInsertionEvent -> {
            if (!(tsFileInsertionEvent instanceof EnrichedEvent) || !str.equals(((EnrichedEvent) tsFileInsertionEvent).getPipeName()) || i != ((EnrichedEvent) tsFileInsertionEvent).getRegionId()) {
                return false;
            }
            if (!((EnrichedEvent) tsFileInsertionEvent).clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
                return true;
            }
            this.eventCounter.decreaseEventCount(tsFileInsertionEvent);
            return true;
        });
    }

    public boolean isEmpty() {
        return super.isEmpty() && this.tsfileInsertEventDeque.isEmpty();
    }

    public int size() {
        return super.size() + this.tsfileInsertEventDeque.size();
    }

    public int getTsFileInsertionEventCount() {
        return this.tsfileInsertEventDeque.size();
    }
}
