package org.apache.pulsar.client.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.functions.runtime.shaded.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.class */
public class PersistentAcknowledgmentsGroupingTracker implements AcknowledgmentsGroupingTracker {
    private static final int MAX_ACK_GROUP_SIZE = 1000;
    private final ConsumerImpl<?> consumer;
    private final long acknowledgementGroupTimeMicros;
    private volatile MessageIdImpl lastCumulativeAck = (MessageIdImpl) MessageId.earliest;
    private volatile boolean cumulativeAckFulshRequired = false;
    private final ConcurrentSkipListSet<MessageIdImpl> pendingIndividualAcks = new ConcurrentSkipListSet<>();
    private final ScheduledFuture<?> scheduledTask;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PersistentAcknowledgmentsGroupingTracker.class);
    private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, MessageIdImpl> LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PersistentAcknowledgmentsGroupingTracker.class, MessageIdImpl.class, "lastCumulativeAck");

    public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumerImpl, ConsumerConfigurationData<?> consumerConfigurationData, EventLoopGroup eventLoopGroup) {
        this.consumer = consumerImpl;
        this.acknowledgementGroupTimeMicros = consumerConfigurationData.getAcknowledgementsGroupTimeMicros();
        if (this.acknowledgementGroupTimeMicros > 0) {
            this.scheduledTask = eventLoopGroup.next().scheduleWithFixedDelay(this::flush, this.acknowledgementGroupTimeMicros, this.acknowledgementGroupTimeMicros, TimeUnit.MICROSECONDS);
        } else {
            this.scheduledTask = null;
        }
    }

    @Override // org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker
    public boolean isDuplicate(MessageId messageId) {
        if (messageId.compareTo(this.lastCumulativeAck) <= 0) {
            return true;
        }
        return this.pendingIndividualAcks.contains(messageId);
    }

    @Override // org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker
    public void addAcknowledgment(MessageIdImpl messageIdImpl, PulsarApi.CommandAck.AckType ackType, Map<String, Long> map) {
        if (this.acknowledgementGroupTimeMicros == 0 || !map.isEmpty()) {
            doImmediateAck(messageIdImpl, ackType, map);
            return;
        }
        if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
            doCumulativeAck(messageIdImpl);
            return;
        }
        this.pendingIndividualAcks.add(messageIdImpl);
        if (this.pendingIndividualAcks.size() >= 1000) {
            flush();
        }
    }

    private void doCumulativeAck(MessageIdImpl messageIdImpl) {
        MessageIdImpl messageIdImpl2;
        do {
            messageIdImpl2 = this.lastCumulativeAck;
            if (messageIdImpl.compareTo((MessageId) messageIdImpl2) <= 0) {
                return;
            }
        } while (!LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, messageIdImpl2, messageIdImpl));
        this.cumulativeAckFulshRequired = true;
    }

    private boolean doImmediateAck(MessageIdImpl messageIdImpl, PulsarApi.CommandAck.AckType ackType, Map<String, Long> map) {
        ClientCnx clientCnx = this.consumer.getClientCnx();
        if (clientCnx == null) {
            return false;
        }
        clientCnx.ctx().writeAndFlush(Commands.newAck(this.consumer.consumerId, messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), ackType, null, map), clientCnx.ctx().voidPromise());
        return true;
    }

    @Override // org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker
    public void flush() {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {}", this.consumer, this.lastCumulativeAck, this.pendingIndividualAcks);
        }
        ClientCnx clientCnx = this.consumer.getClientCnx();
        if (clientCnx == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Cannot flush pending acks since we're not connected to broker", this.consumer);
                return;
            }
            return;
        }
        if (this.cumulativeAckFulshRequired) {
            clientCnx.ctx().write(Commands.newAck(this.consumer.consumerId, this.lastCumulativeAck.ledgerId, this.lastCumulativeAck.entryId, PulsarApi.CommandAck.AckType.Cumulative, null, Collections.emptyMap()), clientCnx.ctx().voidPromise());
            this.cumulativeAckFulshRequired = false;
        }
        if (!this.pendingIndividualAcks.isEmpty()) {
            if (Commands.peerSupportsMultiMessageAcknowledgment(clientCnx.getRemoteEndpointProtocolVersion())) {
                ArrayList arrayList = new ArrayList(this.pendingIndividualAcks.size());
                while (true) {
                    MessageIdImpl pollFirst = this.pendingIndividualAcks.pollFirst();
                    if (pollFirst == null) {
                        break;
                    } else {
                        arrayList.add(Pair.of(Long.valueOf(pollFirst.getLedgerId()), Long.valueOf(pollFirst.getEntryId())));
                    }
                }
                clientCnx.ctx().write(Commands.newMultiMessageAck(this.consumer.consumerId, arrayList), clientCnx.ctx().voidPromise());
            } else {
                while (true) {
                    MessageIdImpl pollFirst2 = this.pendingIndividualAcks.pollFirst();
                    if (pollFirst2 == null) {
                        break;
                    } else {
                        clientCnx.ctx().write(Commands.newAck(this.consumer.consumerId, pollFirst2.getLedgerId(), pollFirst2.getEntryId(), PulsarApi.CommandAck.AckType.Individual, null, Collections.emptyMap()), clientCnx.ctx().voidPromise());
                    }
                }
            }
        }
        clientCnx.ctx().flush();
    }

    @Override // org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker, java.lang.AutoCloseable
    public void close() {
        flush();
        if (this.scheduledTask != null) {
            this.scheduledTask.cancel(true);
        }
    }
}
