package org.apache.pulsar.client.impl;

import java.io.Closeable;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.Timeout;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/client/impl/UnAckedMessageTracker.class */
public class UnAckedMessageTracker implements Closeable {
    protected ConcurrentOpenHashSet<MessageId> currentSet;
    protected ConcurrentOpenHashSet<MessageId> oldOpenSet;
    private final ReentrantReadWriteLock readWriteLock;
    protected final Lock readLock;
    private final Lock writeLock;
    private Timeout timeout;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) UnAckedMessageTracker.class);
    public static final UnAckedMessageTrackerDisabled UNACKED_MESSAGE_TRACKER_DISABLED = new UnAckedMessageTrackerDisabled();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/client/impl/UnAckedMessageTracker$UnAckedMessageTrackerDisabled.class */
    public static class UnAckedMessageTrackerDisabled extends UnAckedMessageTracker {
        private UnAckedMessageTrackerDisabled() {
        }

        @Override // org.apache.pulsar.client.impl.UnAckedMessageTracker
        public void clear() {
        }

        @Override // org.apache.pulsar.client.impl.UnAckedMessageTracker
        public boolean add(MessageId messageId) {
            return true;
        }

        @Override // org.apache.pulsar.client.impl.UnAckedMessageTracker
        public boolean remove(MessageId messageId) {
            return true;
        }

        @Override // org.apache.pulsar.client.impl.UnAckedMessageTracker
        public int removeMessagesTill(MessageId messageId) {
            return 0;
        }

        @Override // org.apache.pulsar.client.impl.UnAckedMessageTracker, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
        }
    }

    public UnAckedMessageTracker() {
        this.readWriteLock = null;
        this.readLock = null;
        this.writeLock = null;
    }

    public UnAckedMessageTracker(PulsarClientImpl pulsarClientImpl, ConsumerBase<?> consumerBase, long j) {
        this.currentSet = new ConcurrentOpenHashSet<>();
        this.oldOpenSet = new ConcurrentOpenHashSet<>();
        this.readWriteLock = new ReentrantReadWriteLock();
        this.readLock = this.readWriteLock.readLock();
        this.writeLock = this.readWriteLock.writeLock();
        start(pulsarClientImpl, consumerBase, j);
    }

    public void start(final PulsarClientImpl pulsarClientImpl, final ConsumerBase<?> consumerBase, final long j) {
        stop();
        this.timeout = pulsarClientImpl.timer().newTimeout(new TimerTask() { // from class: org.apache.pulsar.client.impl.UnAckedMessageTracker.1
            @Override // org.apache.pulsar.functions.runtime.shaded.io.netty.util.TimerTask
            public void run(Timeout timeout) throws Exception {
                UnAckedMessageTracker.this.toggle();
                if (UnAckedMessageTracker.this.isAckTimeout()) {
                    UnAckedMessageTracker.log.warn("[{}] {} messages have timed-out", consumerBase, Long.valueOf(UnAckedMessageTracker.this.oldOpenSet.size()));
                    HashSet hashSet = new HashSet();
                    ConcurrentOpenHashSet<MessageId> concurrentOpenHashSet = UnAckedMessageTracker.this.oldOpenSet;
                    hashSet.getClass();
                    concurrentOpenHashSet.forEach((v1) -> {
                        r1.add(v1);
                    });
                    UnAckedMessageTracker.this.oldOpenSet.clear();
                    consumerBase.redeliverUnacknowledgedMessages(hashSet);
                }
                UnAckedMessageTracker.this.timeout = pulsarClientImpl.timer().newTimeout(this, j, TimeUnit.MILLISECONDS);
            }
        }, j, TimeUnit.MILLISECONDS);
    }

    void toggle() {
        this.writeLock.lock();
        try {
            ConcurrentOpenHashSet<MessageId> concurrentOpenHashSet = this.currentSet;
            this.currentSet = this.oldOpenSet;
            this.oldOpenSet = concurrentOpenHashSet;
        } finally {
            this.writeLock.unlock();
        }
    }

    public void clear() {
        this.readLock.lock();
        try {
            this.currentSet.clear();
            this.oldOpenSet.clear();
        } finally {
            this.readLock.unlock();
        }
    }

    public boolean add(MessageId messageId) {
        this.readLock.lock();
        try {
            this.oldOpenSet.remove(messageId);
            return this.currentSet.add(messageId);
        } finally {
            this.readLock.unlock();
        }
    }

    boolean isEmpty() {
        boolean z;
        this.readLock.lock();
        try {
            if (this.currentSet.isEmpty()) {
                if (this.oldOpenSet.isEmpty()) {
                    z = true;
                    return z;
                }
            }
            z = false;
            return z;
        } finally {
            this.readLock.unlock();
        }
    }

    public boolean remove(MessageId messageId) {
        boolean z;
        this.readLock.lock();
        try {
            if (!this.currentSet.remove(messageId)) {
                if (!this.oldOpenSet.remove(messageId)) {
                    z = false;
                    return z;
                }
            }
            z = true;
            return z;
        } finally {
            this.readLock.unlock();
        }
    }

    long size() {
        this.readLock.lock();
        try {
            return this.currentSet.size() + this.oldOpenSet.size();
        } finally {
            this.readLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isAckTimeout() {
        this.readLock.lock();
        try {
            return !this.oldOpenSet.isEmpty();
        } finally {
            this.readLock.unlock();
        }
    }

    public int removeMessagesTill(MessageId messageId) {
        this.readLock.lock();
        try {
            int removeIf = this.currentSet.removeIf(messageId2 -> {
                return messageId2.compareTo(messageId) <= 0;
            }) + this.oldOpenSet.removeIf(messageId3 -> {
                return messageId3.compareTo(messageId) <= 0;
            });
            this.readLock.unlock();
            return removeIf;
        } catch (Throwable th) {
            this.readLock.unlock();
            throw th;
        }
    }

    private void stop() {
        this.writeLock.lock();
        try {
            if (this.timeout != null && !this.timeout.isCancelled()) {
                this.timeout.cancel();
            }
            clear();
        } finally {
            this.writeLock.unlock();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        stop();
    }
}
