package com.hpe.caf.worker.queue.rabbit;

import com.hpe.caf.api.worker.InvalidTaskException;
import com.hpe.caf.api.worker.TaskCallback;
import com.hpe.caf.api.worker.TaskRejectedException;
import com.hpe.caf.util.rabbitmq.ConsumerAckEvent;
import com.hpe.caf.util.rabbitmq.ConsumerDropEvent;
import com.hpe.caf.util.rabbitmq.ConsumerRejectEvent;
import com.hpe.caf.util.rabbitmq.Delivery;
import com.hpe.caf.util.rabbitmq.Event;
import com.hpe.caf.util.rabbitmq.QueueConsumer;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hpe/caf/worker/queue/rabbit/WorkerQueueConsumerImpl.class */
public class WorkerQueueConsumerImpl implements QueueConsumer {
    public static final String REJECTED_REASON_TASKMESSAGE = "TASKMESSAGE_INVALID";
    private final TaskCallback callback;
    private final RabbitMetricsReporter metrics;
    private final BlockingQueue<Event<QueueConsumer>> consumerEventQueue;
    private final BlockingQueue<Event<WorkerPublisher>> publisherEventQueue;
    private final Channel channel;
    private final String retryRoutingKey;
    private final int retryLimit;
    private static final Logger LOG = LoggerFactory.getLogger(WorkerQueueConsumerImpl.class);

    public WorkerQueueConsumerImpl(TaskCallback taskCallback, RabbitMetricsReporter rabbitMetricsReporter, BlockingQueue<Event<QueueConsumer>> blockingQueue, Channel channel, BlockingQueue<Event<WorkerPublisher>> blockingQueue2, String str, int i) {
        this.callback = (TaskCallback) Objects.requireNonNull(taskCallback);
        this.metrics = (RabbitMetricsReporter) Objects.requireNonNull(rabbitMetricsReporter);
        this.consumerEventQueue = (BlockingQueue) Objects.requireNonNull(blockingQueue);
        this.channel = (Channel) Objects.requireNonNull(channel);
        this.publisherEventQueue = (BlockingQueue) Objects.requireNonNull(blockingQueue2);
        this.retryRoutingKey = (String) Objects.requireNonNull(str);
        this.retryLimit = i;
    }

    public void processDelivery(Delivery delivery) {
        boolean z;
        int parseInt = delivery.getHeaders().containsKey("x-delivery-count") ? Integer.parseInt(String.valueOf(delivery.getHeaders().getOrDefault("x-delivery-count", "0"))) : Integer.parseInt(String.valueOf(delivery.getHeaders().getOrDefault("x-caf-worker-retry", "0")));
        this.metrics.incrementReceived();
        if (!delivery.getEnvelope().isRedeliver()) {
            z = false;
        } else if (delivery.getHeaders().containsKey("x-delivery-count")) {
            z = parseInt > this.retryLimit;
        } else {
            if (parseInt < this.retryLimit) {
                republishClassicRedelivery(delivery, parseInt);
                return;
            }
            z = true;
        }
        RabbitTaskInformation rabbitTaskInformation = new RabbitTaskInformation(String.valueOf(delivery.getEnvelope().getDeliveryTag()), z);
        try {
            LOG.debug("Registering new message {}", rabbitTaskInformation.getInboundMessageId());
            this.callback.registerNewTask(rabbitTaskInformation, delivery.getMessageData(), delivery.getHeaders());
        } catch (TaskRejectedException e) {
            LOG.warn("Message {} rejected as a task at this time, returning to queue", rabbitTaskInformation.getInboundMessageId(), e);
            rabbitTaskInformation.incrementResponseCount(true);
            this.publisherEventQueue.add(new WorkerPublishQueueEvent(delivery.getMessageData(), delivery.getEnvelope().getRoutingKey(), rabbitTaskInformation));
        } catch (InvalidTaskException e2) {
            LOG.error("Cannot register new message, rejecting {}", rabbitTaskInformation.getInboundMessageId(), e2);
            rabbitTaskInformation.incrementResponseCount(true);
            this.publisherEventQueue.add(new WorkerPublishQueueEvent(delivery.getMessageData(), this.retryRoutingKey, rabbitTaskInformation, Collections.singletonMap("x-caf-worker-rejected", REJECTED_REASON_TASKMESSAGE)));
        }
    }

    public void processAck(long j) {
        if (j == -1) {
            return;
        }
        try {
            LOG.debug("Acknowledging message {}", Long.valueOf(j));
            this.channel.basicAck(j, false);
        } catch (IOException e) {
            LOG.warn("Couldn't ack message {}, will retry", Long.valueOf(j), e);
            this.metrics.incremementErrors();
            this.consumerEventQueue.add(new ConsumerAckEvent(j));
        }
    }

    public void processReject(long j) {
        processReject(j, true);
    }

    public void processDrop(long j) {
        processReject(j, false);
    }

    private void processReject(long j, boolean z) {
        if (j == -1) {
            LOG.error("Non-final response has not been acknowledged. This message has been lost!");
            return;
        }
        try {
            this.channel.basicReject(j, z);
            if (z) {
                LOG.debug("Rejecting message {}", Long.valueOf(j));
                this.metrics.incrementRejected();
            } else {
                LOG.warn("Dropping message {}", Long.valueOf(j));
                this.metrics.incrementDropped();
            }
        } catch (IOException e) {
            LOG.warn("Couldn't reject message {}, will retry", Long.valueOf(j), e);
            this.metrics.incremementErrors();
            this.consumerEventQueue.add(z ? new ConsumerRejectEvent(j) : new ConsumerDropEvent(j));
        }
    }

    private void republishClassicRedelivery(Delivery delivery, int i) {
        RabbitTaskInformation rabbitTaskInformation = new RabbitTaskInformation(String.valueOf(delivery.getEnvelope().getDeliveryTag()));
        LOG.debug("Received redelivered message with id {}, retry count {}, retry limit {}, republishing to retry queue", new Object[]{Long.valueOf(delivery.getEnvelope().getDeliveryTag()), Integer.valueOf(this.retryLimit), Integer.valueOf(i + 1)});
        HashMap hashMap = new HashMap();
        hashMap.put("x-caf-worker-retry", String.valueOf(i + 1));
        rabbitTaskInformation.incrementResponseCount(true);
        this.publisherEventQueue.add(new WorkerPublishQueueEvent(delivery.getMessageData(), this.retryRoutingKey, rabbitTaskInformation, hashMap));
    }
}
