package org.apache.rocketmq.client.java.impl.consumer;

import apache.rocketmq.v2.AckMessageRequest;
import apache.rocketmq.v2.AckMessageResponse;
import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.java.exception.BadRequestException;
import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.misc.ExcludeFromJacocoGeneratedReport;
import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.rpc.RpcFuture;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.ListenableFuture;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture;
import org.apache.rocketmq.shaded.com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.rocketmq.shaded.io.grpc.Status;
import org.apache.rocketmq.shaded.io.grpc.StatusRuntimeException;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.class */
public class ProcessQueueImpl implements ProcessQueue {
    static final Duration FORWARD_FIFO_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
    static final Duration ACK_MESSAGE_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
    static final Duration CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ProcessQueueImpl.class);
    private static final Duration RECEIVING_FLOW_CONTROL_BACKOFF_DELAY = Duration.ofMillis(20);
    private static final Duration RECEIVING_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
    private static final Duration RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL = Duration.ofSeconds(1);
    private final PushConsumerImpl consumer;
    private final MessageQueueImpl mq;
    private final FilterExpression filterExpression;
    private volatile long activityNanoTime = System.nanoTime();
    private volatile long cacheFullNanoTime = Long.MIN_VALUE;
    private volatile boolean dropped = false;

    @GuardedBy("cachedMessageLock")
    private final List<MessageViewImpl> cachedMessages = new ArrayList();
    private final ReadWriteLock cachedMessageLock = new ReentrantReadWriteLock();
    private final AtomicLong cachedMessagesBytes = new AtomicLong();
    private final AtomicLong receptionTimes = new AtomicLong(0);
    private final AtomicLong receivedMessagesQuantity = new AtomicLong(0);

    public ProcessQueueImpl(PushConsumerImpl pushConsumerImpl, MessageQueueImpl messageQueueImpl, FilterExpression filterExpression) {
        this.consumer = pushConsumerImpl;
        this.mq = messageQueueImpl;
        this.filterExpression = filterExpression;
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public MessageQueueImpl getMessageQueue() {
        return this.mq;
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public void drop() {
        this.dropped = true;
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public boolean expired() {
        Duration multipliedBy = this.consumer.getPushConsumerSettings().getLongPollingTimeout().plus(this.consumer.getClientConfiguration().getRequestTimeout()).multipliedBy(3L);
        Duration ofNanos = Duration.ofNanos(System.nanoTime() - this.activityNanoTime);
        if (ofNanos.compareTo(multipliedBy) < 0) {
            return false;
        }
        Duration ofNanos2 = Duration.ofNanos(System.nanoTime() - this.cacheFullNanoTime);
        if (ofNanos2.compareTo(multipliedBy) < 0) {
            return false;
        }
        log.warn("Process queue is idle, idleDuration={}, maxIdleDuration={}, afterCacheFullDuration={}, mq={}, clientId={}", ofNanos, multipliedBy, ofNanos2, this.mq, this.consumer.getClientId());
        return true;
    }

    void cacheMessages(List<MessageViewImpl> list) {
        this.cachedMessageLock.writeLock().lock();
        try {
            Iterator<MessageViewImpl> it = list.iterator();
            while (it.hasNext()) {
                this.cachedMessages.add(it.next());
                this.cachedMessagesBytes.addAndGet(r0.getBody().remaining());
            }
        } finally {
            this.cachedMessageLock.writeLock().unlock();
        }
    }

    private int getReceptionBatchSize() {
        return Math.min(Math.max(this.consumer.cacheMessageCountThresholdPerQueue() - cachedMessagesCount(), 1), this.consumer.getPushConsumerSettings().getReceiveBatchSize());
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public void fetchMessageImmediately() {
        receiveMessageImmediately();
    }

    public void onReceiveMessageException(Throwable th, String str) {
        receiveMessageLater(th instanceof TooManyRequestsException ? RECEIVING_FLOW_CONTROL_BACKOFF_DELAY : RECEIVING_FAILURE_BACKOFF_DELAY, str);
    }

    private void receiveMessageLater(Duration duration, String str) {
        ClientId clientId = this.consumer.getClientId();
        ScheduledExecutorService scheduler = this.consumer.getScheduler();
        try {
            log.info("Try to receive message later, mq={}, delay={}, clientId={}", this.mq, duration, clientId);
            scheduler.schedule(() -> {
                receiveMessage(str);
            }, duration.toNanos(), TimeUnit.NANOSECONDS);
        } catch (Throwable th) {
            if (scheduler.isShutdown()) {
                return;
            }
            log.error("[Bug] Failed to schedule message receiving request, mq={}, clientId={}", this.mq, clientId, th);
            onReceiveMessageException(th, str);
        }
    }

    private String generateAttemptId() {
        return UUID.randomUUID().toString();
    }

    public void receiveMessage() {
        receiveMessage(generateAttemptId());
    }

    public void receiveMessage(String str) {
        ClientId clientId = this.consumer.getClientId();
        if (this.dropped) {
            log.info("Process queue has been dropped, no longer receive message, mq={}, clientId={}", this.mq, clientId);
        } else if (!isCacheFull()) {
            receiveMessageImmediately(str);
        } else {
            log.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", this.mq, clientId);
            receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL, str);
        }
    }

    private void receiveMessageImmediately() {
        receiveMessageImmediately(generateAttemptId());
    }

    private void receiveMessageImmediately(final String str) {
        final ClientId clientId = this.consumer.getClientId();
        if (!this.consumer.isRunning()) {
            log.info("Stop to receive message because consumer is not running, mq={}, clientId={}", this.mq, clientId);
            return;
        }
        try {
            final Endpoints endpoints = this.mq.getBroker().getEndpoints();
            int receptionBatchSize = getReceptionBatchSize();
            Duration longPollingTimeout = this.consumer.getPushConsumerSettings().getLongPollingTimeout();
            final ReceiveMessageRequest wrapReceiveMessageRequest = this.consumer.wrapReceiveMessageRequest(receptionBatchSize, this.mq, this.filterExpression, longPollingTimeout, str);
            this.activityNanoTime = System.nanoTime();
            final MessageInterceptorContextImpl messageInterceptorContextImpl = new MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
            this.consumer.doBefore(messageInterceptorContextImpl, Collections.emptyList());
            Futures.addCallback(this.consumer.receiveMessage(wrapReceiveMessageRequest, this.mq, longPollingTimeout), new FutureCallback<ReceiveMessageResult>() { // from class: org.apache.rocketmq.client.java.impl.consumer.ProcessQueueImpl.1
                @Override // org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback
                public void onSuccess(ReceiveMessageResult receiveMessageResult) {
                    ProcessQueueImpl.this.consumer.doAfter(new MessageInterceptorContextImpl(messageInterceptorContextImpl, MessageHookPointsStatus.OK), (List) receiveMessageResult.getMessageViewImpls().stream().map(GeneralMessageImpl::new).collect(Collectors.toList()));
                    try {
                        ProcessQueueImpl.this.onReceiveMessageResult(receiveMessageResult);
                    } catch (Throwable th) {
                        ProcessQueueImpl.log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, clientId={}", ProcessQueueImpl.this.mq, endpoints, clientId, th);
                        ProcessQueueImpl.this.onReceiveMessageException(th, str);
                    }
                }

                @Override // org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback
                public void onFailure(Throwable th) {
                    String str2 = null;
                    if ((th instanceof StatusRuntimeException) && Status.DEADLINE_EXCEEDED.getCode() == ((StatusRuntimeException) th).getStatus().getCode()) {
                        str2 = wrapReceiveMessageRequest.getAttemptId();
                    }
                    ProcessQueueImpl.this.consumer.doAfter(new MessageInterceptorContextImpl(messageInterceptorContextImpl, MessageHookPointsStatus.ERROR), Collections.emptyList());
                    ProcessQueueImpl.log.error("Exception raised during message reception, mq={}, endpoints={}, attemptId={}, nextAttemptId={}, clientId={}", ProcessQueueImpl.this.mq, endpoints, wrapReceiveMessageRequest.getAttemptId(), str2, clientId, th);
                    ProcessQueueImpl.this.onReceiveMessageException(th, str2);
                }
            }, MoreExecutors.directExecutor());
            this.receptionTimes.getAndIncrement();
            this.consumer.getReceptionTimes().getAndIncrement();
        } catch (Throwable th) {
            log.error("Exception raised during message reception, mq={}, clientId={}", this.mq, clientId, th);
            onReceiveMessageException(th, str);
        }
    }

    public boolean isCacheFull() {
        int cacheMessageCountThresholdPerQueue = this.consumer.cacheMessageCountThresholdPerQueue();
        long cachedMessagesCount = cachedMessagesCount();
        ClientId clientId = this.consumer.getClientId();
        if (cacheMessageCountThresholdPerQueue <= cachedMessagesCount) {
            log.warn("Process queue total cached messages quantity exceeds the threshold, threshold={}, actual={}, mq={}, clientId={}", Integer.valueOf(cacheMessageCountThresholdPerQueue), Long.valueOf(cachedMessagesCount), this.mq, clientId);
            this.cacheFullNanoTime = System.nanoTime();
            return true;
        }
        int cacheMessageBytesThresholdPerQueue = this.consumer.cacheMessageBytesThresholdPerQueue();
        long cachedMessageBytes = cachedMessageBytes();
        if (cacheMessageBytesThresholdPerQueue > cachedMessageBytes) {
            return false;
        }
        log.warn("Process queue total cached messages memory exceeds the threshold, threshold={} bytes, actual={} bytes, mq={}, clientId={}", Integer.valueOf(cacheMessageBytesThresholdPerQueue), Long.valueOf(cachedMessageBytes), this.mq, clientId);
        this.cacheFullNanoTime = System.nanoTime();
        return true;
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public void discardMessage(MessageViewImpl messageViewImpl) {
        log.info("Discard message, mq={}, messageId={}, clientId={}", this.mq, messageViewImpl.getMessageId(), this.consumer.getClientId());
        nackMessage(messageViewImpl).addListener(() -> {
            evictCache(messageViewImpl);
        }, MoreExecutors.directExecutor());
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public void discardFifoMessage(MessageViewImpl messageViewImpl) {
        log.info("Discard fifo message, mq={}, messageId={}, clientId={}", this.mq, messageViewImpl.getMessageId(), this.consumer.getClientId());
        forwardToDeadLetterQueue(messageViewImpl).addListener(() -> {
            evictCache(messageViewImpl);
        }, MoreExecutors.directExecutor());
    }

    public int cachedMessagesCount() {
        this.cachedMessageLock.readLock().lock();
        try {
            return this.cachedMessages.size();
        } finally {
            this.cachedMessageLock.readLock().unlock();
        }
    }

    public long cachedMessageBytes() {
        return this.cachedMessagesBytes.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onReceiveMessageResult(ReceiveMessageResult receiveMessageResult) {
        List<MessageViewImpl> messageViewImpls = receiveMessageResult.getMessageViewImpls();
        if (!messageViewImpls.isEmpty()) {
            cacheMessages(messageViewImpls);
            this.receivedMessagesQuantity.getAndAdd(messageViewImpls.size());
            this.consumer.getReceivedMessagesQuantity().getAndAdd(messageViewImpls.size());
            this.consumer.getConsumeService().consume(this, messageViewImpls);
        }
        receiveMessage();
    }

    private void evictCache(MessageViewImpl messageViewImpl) {
        this.cachedMessageLock.writeLock().lock();
        try {
            if (this.cachedMessages.remove(messageViewImpl)) {
                this.cachedMessagesBytes.addAndGet(-messageViewImpl.getBody().remaining());
            }
        } finally {
            this.cachedMessageLock.writeLock().unlock();
        }
    }

    private void statsConsumptionResult(ConsumeResult consumeResult) {
        if (ConsumeResult.SUCCESS.equals(consumeResult)) {
            this.consumer.consumptionOkQuantity.incrementAndGet();
        } else {
            this.consumer.consumptionErrorQuantity.incrementAndGet();
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public void eraseMessage(MessageViewImpl messageViewImpl, ConsumeResult consumeResult) {
        statsConsumptionResult(consumeResult);
        (ConsumeResult.SUCCESS.equals(consumeResult) ? ackMessage(messageViewImpl) : nackMessage(messageViewImpl)).addListener(() -> {
            evictCache(messageViewImpl);
        }, MoreExecutors.directExecutor());
    }

    private ListenableFuture<Void> nackMessage(MessageViewImpl messageViewImpl) {
        Duration nextAttemptDelay = this.consumer.getRetryPolicy().getNextAttemptDelay(messageViewImpl.getDeliveryAttempt());
        SettableFuture<Void> create = SettableFuture.create();
        changeInvisibleDuration(messageViewImpl, nextAttemptDelay, 1, create);
        return create;
    }

    private void changeInvisibleDuration(final MessageViewImpl messageViewImpl, final Duration duration, final int i, final SettableFuture<Void> settableFuture) {
        final ClientId clientId = this.consumer.getClientId();
        final String consumerGroup = this.consumer.getConsumerGroup();
        final MessageId messageId = messageViewImpl.getMessageId();
        final Endpoints endpoints = messageViewImpl.getEndpoints();
        final RpcFuture<ChangeInvisibleDurationRequest, ChangeInvisibleDurationResponse> changeInvisibleDuration = this.consumer.changeInvisibleDuration(messageViewImpl, duration);
        Futures.addCallback(changeInvisibleDuration, new FutureCallback<ChangeInvisibleDurationResponse>() { // from class: org.apache.rocketmq.client.java.impl.consumer.ProcessQueueImpl.2
            @Override // org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(ChangeInvisibleDurationResponse changeInvisibleDurationResponse) {
                String requestId = changeInvisibleDuration.getContext().getRequestId();
                apache.rocketmq.v2.Status status = changeInvisibleDurationResponse.getStatus();
                Code code = status.getCode();
                if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
                    ProcessQueueImpl.log.error("Failed to change invisible duration due to the invalid receipt handle, forgive to retry, clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, requestId={}, status message=[{}]", clientId, consumerGroup, messageId, Integer.valueOf(i), ProcessQueueImpl.this.mq, endpoints, requestId, status.getMessage());
                    settableFuture.setException(new BadRequestException(code.getNumber(), requestId, status.getMessage()));
                } else {
                    if (!Code.OK.equals(code)) {
                        ProcessQueueImpl.log.error("Failed to change invisible duration, would retry later, clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, requestId={}, status message=[{}]", clientId, consumerGroup, messageId, Integer.valueOf(i), ProcessQueueImpl.this.mq, endpoints, requestId, status.getMessage());
                        ProcessQueueImpl.this.changeInvisibleDurationLater(messageViewImpl, duration, 1 + i, settableFuture);
                        return;
                    }
                    settableFuture.setFuture(Futures.immediateVoidFuture());
                    if (1 < i) {
                        ProcessQueueImpl.log.info("Finally, change invisible duration successfully, clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, messageId, Integer.valueOf(i), ProcessQueueImpl.this.mq, endpoints, requestId);
                    } else {
                        ProcessQueueImpl.log.debug("Change invisible duration successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, messageId, ProcessQueueImpl.this.mq, endpoints, requestId);
                    }
                }
            }

            @Override // org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                ProcessQueueImpl.log.error("Exception raised while changing invisible duration, would retry later, clientId={}, consumerGroup={}, messageId={}, mq={}, endpoints={}", clientId, consumerGroup, messageId, ProcessQueueImpl.this.mq, endpoints, th);
                ProcessQueueImpl.this.changeInvisibleDurationLater(messageViewImpl, duration, 1 + i, settableFuture);
            }
        }, MoreExecutors.directExecutor());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void changeInvisibleDurationLater(MessageViewImpl messageViewImpl, Duration duration, int i, SettableFuture<Void> settableFuture) {
        MessageId messageId = messageViewImpl.getMessageId();
        ScheduledExecutorService scheduler = this.consumer.getScheduler();
        try {
            scheduler.schedule(() -> {
                changeInvisibleDuration(messageViewImpl, duration, i, settableFuture);
            }, CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY.toNanos(), TimeUnit.NANOSECONDS);
        } catch (Throwable th) {
            if (scheduler.isShutdown()) {
                return;
            }
            log.error("[Bug] Failed to schedule message change invisible duration request, mq={}, messageId={}, clientId={}", this.mq, messageId, this.consumer.getClientId());
            changeInvisibleDurationLater(messageViewImpl, duration, 1 + i, settableFuture);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public ListenableFuture<Void> eraseFifoMessage(MessageViewImpl messageViewImpl, ConsumeResult consumeResult) {
        statsConsumptionResult(consumeResult);
        RetryPolicy retryPolicy = this.consumer.getRetryPolicy();
        int maxAttempts = retryPolicy.getMaxAttempts();
        int deliveryAttempt = messageViewImpl.getDeliveryAttempt();
        MessageId messageId = messageViewImpl.getMessageId();
        ConsumeService consumeService = this.consumer.getConsumeService();
        ClientId clientId = this.consumer.getClientId();
        if (ConsumeResult.FAILURE.equals(consumeResult) && deliveryAttempt < maxAttempts) {
            Duration nextAttemptDelay = retryPolicy.getNextAttemptDelay(deliveryAttempt);
            log.debug("Prepare to redeliver the fifo message because of the consumption failure, maxAttempt={}, attempt={}, mq={}, messageId={}, nextAttemptDelay={}, clientId={}", Integer.valueOf(maxAttempts), Integer.valueOf(messageViewImpl.incrementAndGetDeliveryAttempt()), this.mq, messageId, nextAttemptDelay, clientId);
            return Futures.transformAsync(consumeService.consume(messageViewImpl, nextAttemptDelay), consumeResult2 -> {
                return eraseFifoMessage(messageViewImpl, consumeResult2);
            }, MoreExecutors.directExecutor());
        }
        boolean equals = ConsumeResult.SUCCESS.equals(consumeResult);
        if (!equals) {
            log.info("Failed to consume fifo message finally, run out of attempt times, maxAttempts={}, attempt={}, mq={}, messageId={}, clientId={}", Integer.valueOf(maxAttempts), Integer.valueOf(deliveryAttempt), this.mq, messageId, clientId);
        }
        ListenableFuture<Void> ackMessage = equals ? ackMessage(messageViewImpl) : forwardToDeadLetterQueue(messageViewImpl);
        ackMessage.addListener(() -> {
            evictCache(messageViewImpl);
        }, this.consumer.getConsumptionExecutor());
        return ackMessage;
    }

    private ListenableFuture<Void> forwardToDeadLetterQueue(MessageViewImpl messageViewImpl) {
        SettableFuture<Void> create = SettableFuture.create();
        forwardToDeadLetterQueue(messageViewImpl, 1, create);
        return create;
    }

    private void forwardToDeadLetterQueue(final MessageViewImpl messageViewImpl, final int i, final SettableFuture<Void> settableFuture) {
        final RpcFuture<ForwardMessageToDeadLetterQueueRequest, ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue = this.consumer.forwardMessageToDeadLetterQueue(messageViewImpl);
        final ClientId clientId = this.consumer.getClientId();
        final String consumerGroup = this.consumer.getConsumerGroup();
        final MessageId messageId = messageViewImpl.getMessageId();
        final Endpoints endpoints = messageViewImpl.getEndpoints();
        Futures.addCallback(forwardMessageToDeadLetterQueue, new FutureCallback<ForwardMessageToDeadLetterQueueResponse>() { // from class: org.apache.rocketmq.client.java.impl.consumer.ProcessQueueImpl.3
            @Override // org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(ForwardMessageToDeadLetterQueueResponse forwardMessageToDeadLetterQueueResponse) {
                String requestId = forwardMessageToDeadLetterQueue.getContext().getRequestId();
                apache.rocketmq.v2.Status status = forwardMessageToDeadLetterQueueResponse.getStatus();
                Code code = status.getCode();
                if (!Code.OK.equals(code)) {
                    ProcessQueueImpl.log.error("Failed to forward message to dead letter queue, would attempt to re-forward later, clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, requestId={}, code={}, status message={}", clientId, consumerGroup, messageId, Integer.valueOf(i), ProcessQueueImpl.this.mq, endpoints, requestId, code, status.getMessage());
                    ProcessQueueImpl.this.forwardToDeadLetterQueueLater(messageViewImpl, 1 + i, settableFuture);
                    return;
                }
                settableFuture.setFuture(Futures.immediateVoidFuture());
                if (1 < i) {
                    ProcessQueueImpl.log.info("Re-forward message to dead letter queue successfully, clientId={}, consumerGroup={}, attempt={}, messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, Integer.valueOf(i), messageId, ProcessQueueImpl.this.mq, endpoints, requestId);
                } else {
                    ProcessQueueImpl.log.info("Forward message to dead letter queue successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, messageId, ProcessQueueImpl.this.mq, endpoints, requestId);
                }
            }

            @Override // org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                ProcessQueueImpl.log.error("Exception raised while forward message to DLQ, would attempt to re-forward later, clientId={}, consumerGroup={}, attempt={}, messageId={}, mq={}", clientId, consumerGroup, Integer.valueOf(i), messageId, ProcessQueueImpl.this.mq, th);
                ProcessQueueImpl.this.forwardToDeadLetterQueueLater(messageViewImpl, 1 + i, settableFuture);
            }
        }, MoreExecutors.directExecutor());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void forwardToDeadLetterQueueLater(MessageViewImpl messageViewImpl, int i, SettableFuture<Void> settableFuture) {
        ScheduledExecutorService scheduler = this.consumer.getScheduler();
        try {
            scheduler.schedule(() -> {
                forwardToDeadLetterQueue(messageViewImpl, i, settableFuture);
            }, FORWARD_FIFO_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY.toNanos(), TimeUnit.NANOSECONDS);
        } catch (Throwable th) {
            if (scheduler.isShutdown()) {
                return;
            }
            log.error("[Bug] Failed to schedule DLQ message request, mq={}, messageId={}, clientId={}", this.mq, messageViewImpl.getMessageId(), this.consumer.getClientId());
            forwardToDeadLetterQueueLater(messageViewImpl, 1 + i, settableFuture);
        }
    }

    private ListenableFuture<Void> ackMessage(MessageViewImpl messageViewImpl) {
        SettableFuture<Void> create = SettableFuture.create();
        ackMessage(messageViewImpl, 1, create);
        return create;
    }

    private void ackMessage(final MessageViewImpl messageViewImpl, final int i, final SettableFuture<Void> settableFuture) {
        final ClientId clientId = this.consumer.getClientId();
        final String consumerGroup = this.consumer.getConsumerGroup();
        final MessageId messageId = messageViewImpl.getMessageId();
        final Endpoints endpoints = messageViewImpl.getEndpoints();
        final RpcFuture<AckMessageRequest, AckMessageResponse> ackMessage = this.consumer.ackMessage(messageViewImpl);
        Futures.addCallback(ackMessage, new FutureCallback<AckMessageResponse>() { // from class: org.apache.rocketmq.client.java.impl.consumer.ProcessQueueImpl.4
            @Override // org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(AckMessageResponse ackMessageResponse) {
                String requestId = ackMessage.getContext().getRequestId();
                apache.rocketmq.v2.Status status = ackMessageResponse.getStatus();
                Code code = status.getCode();
                if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
                    ProcessQueueImpl.log.error("Failed to ack message due to the invalid receipt handle, forgive to retry, clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, requestId={}, status message=[{}]", clientId, consumerGroup, messageId, Integer.valueOf(i), ProcessQueueImpl.this.mq, endpoints, requestId, status.getMessage());
                    settableFuture.setException(new BadRequestException(code.getNumber(), requestId, status.getMessage()));
                } else {
                    if (!Code.OK.equals(code)) {
                        ProcessQueueImpl.log.error("Failed to ack message, would attempt to re-ack later, clientId={}, consumerGroup={}, attempt={}, messageId={}, mq={}, code={}, requestId={}, endpoints={}, status message=[{}]", clientId, consumerGroup, Integer.valueOf(i), messageId, ProcessQueueImpl.this.mq, code, requestId, endpoints, status.getMessage());
                        ProcessQueueImpl.this.ackMessageLater(messageViewImpl, 1 + i, settableFuture);
                        return;
                    }
                    settableFuture.setFuture(Futures.immediateVoidFuture());
                    if (1 < i) {
                        ProcessQueueImpl.log.info("Finally, ack message successfully, clientId={}, consumerGroup={}, attempt={}, messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, Integer.valueOf(i), messageId, ProcessQueueImpl.this.mq, endpoints, requestId);
                    } else {
                        ProcessQueueImpl.log.debug("Ack message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, messageId, ProcessQueueImpl.this.mq, endpoints, requestId);
                    }
                }
            }

            @Override // org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                ProcessQueueImpl.log.error("Exception raised while acknowledging message, clientId={}, consumerGroup={}, would attempt to re-ack later, attempt={}, messageId={}, mq={}, endpoints={}", clientId, consumerGroup, Integer.valueOf(i), messageId, ProcessQueueImpl.this.mq, endpoints, th);
                ProcessQueueImpl.this.ackMessageLater(messageViewImpl, 1 + i, settableFuture);
            }
        }, MoreExecutors.directExecutor());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ackMessageLater(MessageViewImpl messageViewImpl, int i, SettableFuture<Void> settableFuture) {
        MessageId messageId = messageViewImpl.getMessageId();
        ScheduledExecutorService scheduler = this.consumer.getScheduler();
        try {
            scheduler.schedule(() -> {
                ackMessage(messageViewImpl, i, settableFuture);
            }, ACK_MESSAGE_FAILURE_BACKOFF_DELAY.toNanos(), TimeUnit.NANOSECONDS);
        } catch (Throwable th) {
            if (scheduler.isShutdown()) {
                return;
            }
            log.error("[Bug] Failed to schedule message ack request, mq={}, messageId={}, clientId={}", this.mq, messageId, this.consumer.getClientId());
            ackMessageLater(messageViewImpl, 1 + i, settableFuture);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public long getCachedMessageCount() {
        this.cachedMessageLock.readLock().lock();
        try {
            return this.cachedMessages.size();
        } finally {
            this.cachedMessageLock.readLock().unlock();
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public long getCachedMessageBytes() {
        return this.cachedMessagesBytes.get();
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    @ExcludeFromJacocoGeneratedReport
    public void doStats() {
        log.info("Process queue stats: clientId={}, mq={}, receptionTimes={}, receivedMessageQuantity={}, cachedMessageCount={}, cachedMessageBytes={}", this.consumer.getClientId(), this.mq, Long.valueOf(this.receptionTimes.getAndSet(0L)), Long.valueOf(this.receivedMessagesQuantity.getAndSet(0L)), Long.valueOf(getCachedMessageCount()), Long.valueOf(getCachedMessageBytes()));
    }
}
