package com.hpe.caf.worker.queue.rabbit;

import com.hpe.caf.api.HealthResult;
import com.hpe.caf.api.HealthStatus;
import com.hpe.caf.api.worker.ManagedWorkerQueue;
import com.hpe.caf.api.worker.QueueException;
import com.hpe.caf.api.worker.TaskCallback;
import com.hpe.caf.api.worker.WorkerQueueMetricsReporter;
import com.hpe.caf.configs.RabbitConfiguration;
import com.hpe.caf.util.rabbitmq.ConsumerAckEvent;
import com.hpe.caf.util.rabbitmq.ConsumerDropEvent;
import com.hpe.caf.util.rabbitmq.ConsumerRejectEvent;
import com.hpe.caf.util.rabbitmq.DefaultRabbitConsumer;
import com.hpe.caf.util.rabbitmq.Event;
import com.hpe.caf.util.rabbitmq.EventPoller;
import com.hpe.caf.util.rabbitmq.QueueConsumer;
import com.hpe.caf.util.rabbitmq.RabbitUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import net.jodah.lyra.ConnectionOptions;
import net.jodah.lyra.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hpe/caf/worker/queue/rabbit/RabbitWorkerQueue.class */
public final class RabbitWorkerQueue implements ManagedWorkerQueue {
    private DefaultRabbitConsumer consumer;
    private EventPoller<WorkerPublisher> publisher;
    private Connection conn;
    private Channel incomingChannel;
    private Channel outgoingChannel;
    private Thread publisherThread;
    private Thread consumerThread;
    private final List<String> consumerTags = new LinkedList();
    private final Set<String> declaredQueues = new HashSet();
    private final BlockingQueue<Event<QueueConsumer>> consumerQueue = new LinkedBlockingQueue();
    private final BlockingQueue<Event<WorkerPublisher>> publisherQueue = new LinkedBlockingQueue();
    private final RabbitMetricsReporter metrics = new RabbitMetricsReporter();
    private final RabbitWorkerQueueConfiguration config;
    private final int maxTasks;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) RabbitWorkerQueue.class);

    public RabbitWorkerQueue(RabbitWorkerQueueConfiguration rabbitWorkerQueueConfiguration, int i) {
        this.config = (RabbitWorkerQueueConfiguration) Objects.requireNonNull(rabbitWorkerQueueConfiguration);
        this.maxTasks = i;
        LOG.debug("Initialised");
    }

    @Override // com.hpe.caf.api.worker.ManagedWorkerQueue
    public void start(TaskCallback taskCallback) throws QueueException {
        if (this.conn != null) {
            throw new IllegalStateException("Already started");
        }
        try {
            WorkerConfirmListener workerConfirmListener = new WorkerConfirmListener(this.consumerQueue);
            createConnection(taskCallback, workerConfirmListener);
            this.outgoingChannel = this.conn.createChannel();
            this.incomingChannel = this.conn.createChannel();
            this.incomingChannel.basicQos(Math.max(1, this.maxTasks + this.config.getPrefetchBuffer()));
            this.consumer = new DefaultRabbitConsumer(this.consumerQueue, new WorkerQueueConsumerImpl(taskCallback, this.metrics, this.consumerQueue, this.incomingChannel, this.publisherQueue, this.config.getRetryQueue(), this.config.getRetryLimit()));
            this.publisher = new EventPoller<>(2, this.publisherQueue, new WorkerPublisherImpl(this.outgoingChannel, this.metrics, this.consumerQueue, workerConfirmListener));
            declareWorkerQueue(this.incomingChannel, this.config.getInputQueue(), this.config.getMaxPriority());
            declareWorkerQueue(this.outgoingChannel, this.config.getRetryQueue(), this.config.getMaxPriority());
            this.consumerTags.add(this.incomingChannel.basicConsume(this.config.getInputQueue(), this.consumer));
            this.publisherThread = new Thread(this.publisher);
            this.consumerThread = new Thread(this.consumer);
            this.publisherThread.start();
            this.consumerThread.start();
        } catch (IOException | TimeoutException e) {
            throw new QueueException("Failed to establish queues", e);
        }
    }

    @Override // com.hpe.caf.api.worker.WorkerQueue
    public void publish(String str, byte[] bArr, String str2, Map<String, Object> map, int i) throws QueueException {
        try {
            declareWorkerQueue(this.outgoingChannel, str2, this.config.getMaxPriority());
            this.publisherQueue.add(new WorkerPublishQueueEvent(bArr, str2, Long.parseLong(str), map, i));
        } catch (IOException e) {
            throw new QueueException("Failed to submit task", e);
        }
    }

    @Override // com.hpe.caf.api.worker.WorkerQueue
    public void publish(String str, byte[] bArr, String str2, Map<String, Object> map) throws QueueException {
        publish(str, bArr, str2, map, 0);
    }

    @Override // com.hpe.caf.api.worker.WorkerQueue
    public void rejectTask(String str) {
        Objects.requireNonNull(str);
        LOG.debug("Generating reject event for task {}", str);
        this.consumerQueue.add(new ConsumerRejectEvent(Long.parseLong(str)));
    }

    @Override // com.hpe.caf.api.worker.WorkerQueue
    public void discardTask(String str) {
        Objects.requireNonNull(str);
        LOG.debug("Generating drop event for task {}", str);
        this.consumerQueue.add(new ConsumerDropEvent(Long.parseLong(str)));
    }

    @Override // com.hpe.caf.api.worker.WorkerQueue
    public void acknowledgeTask(String str) {
        Objects.requireNonNull(str);
        LOG.debug("Generating acknowledge event for task {}", str);
        this.consumerQueue.add(new ConsumerAckEvent(Long.parseLong(str)));
    }

    @Override // com.hpe.caf.api.worker.WorkerQueue
    public String getInputQueue() {
        return this.config.getInputQueue();
    }

    @Override // com.hpe.caf.api.worker.ManagedWorkerQueue
    public void shutdownIncoming() {
        LOG.debug("Closing incoming queues");
        for (String str : this.consumerTags) {
            try {
                this.incomingChannel.basicCancel(str);
            } catch (IOException e) {
                this.metrics.incremementErrors();
                LOG.warn("Failed to cancel consumer {}", str, e);
            }
        }
    }

    @Override // com.hpe.caf.api.worker.ManagedWorkerQueue
    public void shutdown() {
        LOG.debug("Shutting down");
        try {
            if (this.consumer != null) {
                this.consumer.shutdown();
            }
            if (this.publisher != null) {
                this.publisher.shutdown();
            }
            if (this.conn != null) {
                this.incomingChannel.close();
                this.outgoingChannel.close();
                this.conn.close();
            }
        } catch (IOException | TimeoutException e) {
            this.metrics.incremementErrors();
            LOG.warn("Failed to close rabbit connections", e);
        }
    }

    @Override // com.hpe.caf.api.worker.ManagedWorkerQueue
    public WorkerQueueMetricsReporter getMetrics() {
        return this.metrics;
    }

    @Override // com.hpe.caf.api.HealthReporter
    public HealthResult healthCheck() {
        return !this.conn.isOpen() ? new HealthResult(HealthStatus.UNHEALTHY, "Rabbit connection failed") : !this.incomingChannel.isOpen() ? new HealthResult(HealthStatus.UNHEALTHY, "Incoming channel failed") : !this.outgoingChannel.isOpen() ? new HealthResult(HealthStatus.UNHEALTHY, "Outgoing channel failed") : (this.consumerThread == null || !this.consumerThread.isAlive()) ? new HealthResult(HealthStatus.UNHEALTHY, "RabbitMQ listening thread not running") : (this.publisherThread == null || !this.publisherThread.isAlive()) ? new HealthResult(HealthStatus.UNHEALTHY, "RabbitMQ publishing thread not running") : HealthResult.RESULT_HEALTHY;
    }

    private void createConnection(TaskCallback taskCallback, WorkerConfirmListener workerConfirmListener) throws IOException, TimeoutException {
        RabbitConfiguration rabbitConfiguration = this.config.getRabbitConfiguration();
        ConnectionOptions createLyraConnectionOptions = RabbitUtil.createLyraConnectionOptions(rabbitConfiguration.getRabbitHost(), rabbitConfiguration.getRabbitPort(), rabbitConfiguration.getRabbitUser(), rabbitConfiguration.getRabbitPassword());
        Config createLyraConfig = RabbitUtil.createLyraConfig(rabbitConfiguration.getBackoffInterval(), rabbitConfiguration.getMaxBackoffInterval(), -1);
        createLyraConfig.withConnectionListeners(new WorkerConnectionListener(taskCallback, workerConfirmListener));
        this.conn = RabbitUtil.createRabbitConnection(createLyraConnectionOptions, createLyraConfig);
    }

    private void declareWorkerQueue(Channel channel, String str, int i) throws IOException {
        if (this.declaredQueues.contains(str)) {
            return;
        }
        RabbitUtil.declareWorkerQueue(channel, str, i);
    }
}
