package org.apache.iotdb.db.subscription.broker;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.class */
public class TsFileDeduplicationBlockingPendingQueue extends SubscriptionBlockingPendingQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(TsFileDeduplicationBlockingPendingQueue.class);
    private final Cache<Integer, Boolean> hashCodeToIsGeneratedByHistoricalExtractor;

    public TsFileDeduplicationBlockingPendingQueue(UnboundedBlockingPendingQueue<Event> unboundedBlockingPendingQueue) {
        super(unboundedBlockingPendingQueue);
        this.hashCodeToIsGeneratedByHistoricalExtractor = Caffeine.newBuilder().expireAfterAccess(SubscriptionConfig.getInstance().getSubscriptionTsFileDeduplicationWindowSeconds(), TimeUnit.SECONDS).build();
    }

    @Override // org.apache.iotdb.db.subscription.broker.SubscriptionBlockingPendingQueue
    public Event waitedPoll() {
        return filter(this.inputPendingQueue.waitedPoll());
    }

    @Override // org.apache.iotdb.db.subscription.broker.SubscriptionBlockingPendingQueue
    public Event peek() {
        return this.inputPendingQueue.peek();
    }

    @Override // org.apache.iotdb.db.subscription.broker.SubscriptionBlockingPendingQueue
    public void directOffer(Event event) {
        this.inputPendingQueue.directOffer(event);
    }

    private synchronized Event filter(Event event) {
        if (Objects.isNull(event)) {
            return null;
        }
        if (event instanceof PipeRawTabletInsertionEvent) {
            PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent = (PipeRawTabletInsertionEvent) event;
            EnrichedEvent sourceEvent = pipeRawTabletInsertionEvent.getSourceEvent();
            if ((sourceEvent instanceof PipeTsFileInsertionEvent) && isDuplicated((PipeTsFileInsertionEvent) sourceEvent)) {
                pipeRawTabletInsertionEvent.decreaseReferenceCount(TsFileDeduplicationBlockingPendingQueue.class.getName(), true);
                return null;
            }
        }
        if (event instanceof PipeTsFileInsertionEvent) {
            PipeTsFileInsertionEvent pipeTsFileInsertionEvent = (PipeTsFileInsertionEvent) event;
            if (isDuplicated(pipeTsFileInsertionEvent)) {
                pipeTsFileInsertionEvent.decreaseReferenceCount(TsFileDeduplicationBlockingPendingQueue.class.getName(), true);
                return null;
            }
        }
        return event;
    }

    private boolean isDuplicated(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) {
        int hashCode = pipeTsFileInsertionEvent.getTsFile().hashCode();
        boolean isGeneratedByHistoricalExtractor = pipeTsFileInsertionEvent.isGeneratedByHistoricalExtractor();
        Boolean bool = (Boolean) this.hashCodeToIsGeneratedByHistoricalExtractor.getIfPresent(Integer.valueOf(hashCode));
        if (Objects.isNull(bool)) {
            this.hashCodeToIsGeneratedByHistoricalExtractor.put(Integer.valueOf(hashCode), Boolean.valueOf(isGeneratedByHistoricalExtractor));
            return false;
        }
        if (Objects.equals(bool, Boolean.valueOf(isGeneratedByHistoricalExtractor))) {
            return false;
        }
        LOGGER.info("Subscription: Detect duplicated PipeTsFileInsertionEvent {}, commit it directly", pipeTsFileInsertionEvent.coreReportMessage());
        return true;
    }
}
