package com.github.cafdataprocessing.worker.policy.testing.shared;

import com.google.common.base.Strings;
import com.hpe.caf.api.CodecException;
import com.hpe.caf.api.worker.TaskMessage;
import com.hpe.caf.codec.JsonCodec;
import com.hpe.caf.util.rabbitmq.ConsumerAckEvent;
import com.hpe.caf.util.rabbitmq.ConsumerRejectEvent;
import com.hpe.caf.util.rabbitmq.Delivery;
import com.hpe.caf.util.rabbitmq.Event;
import com.hpe.caf.util.rabbitmq.QueueConsumer;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/cafdataprocessing/worker/policy/testing/shared/TestQueueConsumerImpl.class */
public class TestQueueConsumerImpl implements QueueConsumer {
    private static final Logger logger = LoggerFactory.getLogger(TestQueueConsumerImpl.class);
    private final Map<String, Delivery> delivery;
    private final CountDownLatch latch;
    private final BlockingQueue<Event<QueueConsumer>> eventQueue;
    private final Channel channel;
    private final String taskIdFilter;

    public TestQueueConsumerImpl(CountDownLatch countDownLatch, BlockingQueue<Event<QueueConsumer>> blockingQueue, Channel channel, String str) {
        this.delivery = new HashMap();
        this.latch = (CountDownLatch) Objects.requireNonNull(countDownLatch);
        this.eventQueue = (BlockingQueue) Objects.requireNonNull(blockingQueue);
        this.channel = (Channel) Objects.requireNonNull(channel);
        this.taskIdFilter = str;
    }

    public TestQueueConsumerImpl(CountDownLatch countDownLatch, BlockingQueue<Event<QueueConsumer>> blockingQueue, Channel channel) {
        this.delivery = new HashMap();
        this.latch = (CountDownLatch) Objects.requireNonNull(countDownLatch);
        this.eventQueue = (BlockingQueue) Objects.requireNonNull(blockingQueue);
        this.channel = (Channel) Objects.requireNonNull(channel);
        this.taskIdFilter = null;
    }

    public void processDelivery(Delivery delivery) {
        TaskMessage taskMessageFromDelivery = getTaskMessageFromDelivery(delivery);
        if (Strings.isNullOrEmpty(taskMessageFromDelivery.getTaskId())) {
            throw new RuntimeException("Invalid delivery of message without a taskID");
        }
        if (!Strings.isNullOrEmpty(this.taskIdFilter) && !taskMessageFromDelivery.getTaskId().contains(this.taskIdFilter) && (taskMessageFromDelivery.getTracking() == null || !taskMessageFromDelivery.getTracking().getJobTaskId().contains(this.taskIdFilter))) {
            logger.warn("Received message which wasn't expected by this test case -> throwing it away taskId: " + taskMessageFromDelivery.getTaskId() + " jobTracking: " + (taskMessageFromDelivery.getTracking() == null ? "null" : taskMessageFromDelivery.getTracking().getJobTaskId()));
            this.eventQueue.add(new ConsumerRejectEvent(delivery.getEnvelope().getDeliveryTag()));
        } else {
            logger.info("Recieved message: " + taskMessageFromDelivery.getTaskId() + " jobTracking: " + (taskMessageFromDelivery.getTracking() == null ? "null" : taskMessageFromDelivery.getTracking().getJobTaskId()));
            this.delivery.put(taskMessageFromDelivery.getTaskId(), delivery);
            this.eventQueue.add(new ConsumerAckEvent(delivery.getEnvelope().getDeliveryTag()));
        }
    }

    public void processAck(long j) {
        try {
            this.channel.basicAck(j, false);
            this.latch.countDown();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void processReject(long j) {
        try {
            this.channel.basicReject(j, false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void processDrop(long j) {
        try {
            logger.info("processDrop: Ack and drop message: " + j);
            this.channel.basicAck(j, false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static TaskMessage getTaskMessageFromDelivery(Delivery delivery) {
        Assert.assertNotNull(delivery);
        try {
            return (TaskMessage) new JsonCodec().deserialise(delivery.getMessageData(), TaskMessage.class);
        } catch (CodecException e) {
            throw new RuntimeException("Failed to turn message delivery into a TaskMessage: ", e);
        }
    }

    public Map<String, Delivery> getDelivery() {
        return this.delivery;
    }

    public Delivery getDelivery(TaskMessage taskMessage) {
        return getDelivery(taskMessage.getTaskId());
    }

    public Delivery getDelivery(String str) {
        return findDeliveredMessage(str);
    }

    private Delivery findDeliveredMessage(String str) {
        if (this.delivery.isEmpty()) {
            return null;
        }
        logger.debug("trying to find task: " + str);
        Delivery delivery = this.delivery.get(str);
        if (delivery != null) {
            this.delivery.remove(str);
            return delivery;
        }
        logger.debug("Failed to find a direct taskId match on the queue.");
        Iterator<Map.Entry<String, Delivery>> it = this.delivery.entrySet().iterator();
        while (it.hasNext()) {
            Delivery value = it.next().getValue();
            TaskMessage taskMessageFromDelivery = getTaskMessageFromDelivery(value);
            Logger logger2 = logger;
            Object[] objArr = new Object[2];
            objArr[0] = taskMessageFromDelivery.getTaskId();
            objArr[1] = taskMessageFromDelivery.getTracking() == null ? "null" : taskMessageFromDelivery.getTracking().getJobTaskId();
            logger2.debug(String.format("Queue contains other items: {%s} TrackingInfo jobTaskId: {%s}", objArr));
            if (taskMessageFromDelivery.getTracking() != null && taskMessageFromDelivery.getTracking().getJobTaskId().contains(str)) {
                logger.debug("Queue contains match in tracking info: " + taskMessageFromDelivery.getTracking().getJobTaskId() + " with actual taskId:" + taskMessageFromDelivery.getTaskId());
                this.delivery.remove(taskMessageFromDelivery.getTaskId());
                return value;
            }
        }
        return null;
    }
}
