package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.io.Closeable;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-4.0.3.jar:org/apache/pulsar/client/impl/NegativeAcksTracker.class */
public class NegativeAcksTracker implements Closeable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) NegativeAcksTracker.class);
    private Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = null;
    private final ConsumerBase<?> consumer;
    private final Timer timer;
    private final long nackDelayMs;
    private final RedeliveryBackoff negativeAckRedeliveryBackoff;
    private final int negativeAckPrecisionBitCnt;
    private Timeout timeout;
    private static final long MIN_NACK_DELAY_MS = 100;
    private static final int DUMMY_PARTITION_INDEX = -2;

    public NegativeAcksTracker(ConsumerBase<?> consumerBase, ConsumerConfigurationData<?> consumerConfigurationData) {
        this.consumer = consumerBase;
        this.timer = consumerBase.getClient().timer();
        this.nackDelayMs = Math.max(TimeUnit.MICROSECONDS.toMillis(consumerConfigurationData.getNegativeAckRedeliveryDelayMicros()), 100L);
        this.negativeAckRedeliveryBackoff = consumerConfigurationData.getNegativeAckRedeliveryBackoff();
        this.negativeAckPrecisionBitCnt = consumerConfigurationData.getNegativeAckPrecisionBitCnt();
    }

    private void triggerRedelivery(Timeout timeout) {
        HashSet hashSet = new HashSet();
        synchronized (this) {
            if (this.nackedMessages.isEmpty()) {
                this.timeout = null;
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            LongBidirectionalIterator it = this.nackedMessages.keySet().iterator();
            while (it.hasNext()) {
                long longValue = ((Long) it.next()).longValue();
                if (longValue > currentTimeMillis) {
                    break;
                }
                ObjectIterator it2 = ((Long2ObjectMap) this.nackedMessages.get(longValue)).long2ObjectEntrySet().iterator();
                while (it2.hasNext()) {
                    Long2ObjectMap.Entry entry = (Long2ObjectMap.Entry) it2.next();
                    long longKey = entry.getLongKey();
                    ((Roaring64Bitmap) entry.getValue()).forEach(j -> {
                        MessageIdImpl messageIdImpl = new MessageIdImpl(longKey, j, -2);
                        UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap(messageIdImpl, hashSet, this.consumer);
                        hashSet.add(messageIdImpl);
                    });
                }
            }
            LongBidirectionalIterator it3 = this.nackedMessages.keySet().iterator();
            while (it3.hasNext() && it3.nextLong() <= currentTimeMillis) {
                it3.remove();
            }
            if (this.nackedMessages.isEmpty()) {
                this.timeout = null;
            } else {
                long max = Math.max(this.nackedMessages.firstLongKey() - currentTimeMillis, 0L);
                if (max > 0) {
                    this.timeout = this.timer.newTimeout(this::triggerRedelivery, max, TimeUnit.MILLISECONDS);
                } else {
                    this.timeout = this.timer.newTimeout(this::triggerRedelivery, 0L, TimeUnit.MILLISECONDS);
                }
            }
            if (hashSet.isEmpty()) {
                return;
            }
            this.consumer.onNegativeAcksSend(hashSet);
            log.info("[{}] {} messages will be re-delivered", this.consumer, Integer.valueOf(hashSet.size()));
            this.consumer.redeliverUnacknowledgedMessages(hashSet);
        }
    }

    public synchronized void add(MessageId messageId) {
        add(messageId, 0);
    }

    public synchronized void add(Message<?> message) {
        add(message.getMessageId(), message.getRedeliveryCount());
    }

    static long trimLowerBit(long j, int i) {
        return j & ((-1) << i);
    }

    private synchronized void add(MessageId messageId, int i) {
        if (this.nackedMessages == null) {
            this.nackedMessages = new Long2ObjectAVLTreeMap();
        }
        long millis = this.negativeAckRedeliveryBackoff != null ? TimeUnit.MILLISECONDS.toMillis(this.negativeAckRedeliveryBackoff.next(i)) : this.nackDelayMs;
        MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
        ((Roaring64Bitmap) ((Long2ObjectMap) this.nackedMessages.computeIfAbsent(trimLowerBit(System.currentTimeMillis() + millis, this.negativeAckPrecisionBitCnt), j -> {
            return new Long2ObjectOpenHashMap();
        })).computeIfAbsent(messageIdAdv.getLedgerId(), j2 -> {
            return new Roaring64Bitmap();
        })).add(new long[]{messageIdAdv.getEntryId()});
        if (this.timeout == null) {
            this.timeout = this.timer.newTimeout(this::triggerRedelivery, millis, TimeUnit.MILLISECONDS);
        }
    }

    public static MessageIdAdv discardBatchAndPartitionIndex(MessageId messageId) {
        if (messageId instanceof ChunkMessageIdImpl) {
            return (MessageIdAdv) messageId;
        }
        MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
        return new MessageIdImpl(messageIdAdv.getLedgerId(), messageIdAdv.getEntryId(), -2);
    }

    @VisibleForTesting
    synchronized long getNackedMessagesCount() {
        if (this.nackedMessages == null) {
            return 0L;
        }
        return this.nackedMessages.values().stream().mapToLong(long2ObjectMap -> {
            return long2ObjectMap.values().stream().mapToLong((v0) -> {
                return v0.getLongCardinality();
            }).sum();
        }).sum();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.timeout != null && !this.timeout.isCancelled()) {
            this.timeout.cancel();
            this.timeout = null;
        }
        if (this.nackedMessages != null) {
            this.nackedMessages.clear();
            this.nackedMessages = null;
        }
    }
}
