package com.github.silverlight7.common.port.adapter.messaging.rabbitmq;

import com.github.silverlight7.common.port.adapter.messaging.MessageException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

/* loaded from: input_file:com/github/silverlight7/common/port/adapter/messaging/rabbitmq/MessageConsumer.class */
public class MessageConsumer {
    private boolean autoAcknowledged;
    private boolean closed;
    private Set<String> messageTypes;
    private Queue queue;
    private String tag;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/silverlight7/common/port/adapter/messaging/rabbitmq/MessageConsumer$DispatchingConsumer.class */
    public class DispatchingConsumer extends DefaultConsumer {
        private MessageListener messageListener;

        public DispatchingConsumer(Channel channel, MessageListener messageListener) {
            super(channel);
            setMessageListener(messageListener);
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            if (!MessageConsumer.this.isClosed()) {
                handle(messageListener(), envelope, basicProperties, bArr);
            }
            if (MessageConsumer.this.isClosed()) {
                MessageConsumer.this.queue().close();
            }
        }

        public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
            MessageConsumer.this.close();
        }

        private void handle(MessageListener messageListener, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
            long deliveryTag = envelope.getDeliveryTag();
            try {
                if (!filteredMessageType(basicProperties)) {
                    if (messageListener.type().isBinaryListener()) {
                        messageListener.handleMessage(basicProperties.getType(), basicProperties.getMessageId(), basicProperties.getTimestamp(), bArr, envelope.getDeliveryTag(), envelope.isRedeliver());
                    } else if (messageListener.type().isTextListener()) {
                        messageListener.handleMessage(basicProperties.getType(), basicProperties.getMessageId(), basicProperties.getTimestamp(), new String(bArr, "utf-8"), envelope.getDeliveryTag(), envelope.isRedeliver());
                    }
                }
                ack(deliveryTag);
            } catch (MessageException e) {
                nack(deliveryTag, e.isRetry());
            } catch (Throwable th) {
                nack(deliveryTag, true);
            }
        }

        private void ack(long j) {
            try {
                if (!MessageConsumer.this.isAutoAcknowledged()) {
                    getChannel().basicAck(j, false);
                }
            } catch (IOException e) {
            }
        }

        private void nack(long j, boolean z) {
            try {
                if (!MessageConsumer.this.isAutoAcknowledged()) {
                    getChannel().basicNack(j, false, z);
                }
            } catch (IOException e) {
            }
        }

        private boolean filteredMessageType(AMQP.BasicProperties basicProperties) {
            String type;
            boolean z = false;
            Set messageTypes = MessageConsumer.this.messageTypes();
            if (!messageTypes.isEmpty() && ((type = basicProperties.getType()) == null || !messageTypes.contains(type))) {
                z = true;
            }
            return z;
        }

        private MessageListener messageListener() {
            return this.messageListener;
        }

        private void setMessageListener(MessageListener messageListener) {
            this.messageListener = messageListener;
        }
    }

    public static MessageConsumer autoAcknowledgedInstance(Queue queue) {
        return instance(queue, true);
    }

    public static MessageConsumer instance(Queue queue) {
        return new MessageConsumer(queue, false);
    }

    public static MessageConsumer instance(Queue queue, boolean z) {
        return new MessageConsumer(queue, z);
    }

    public void close() {
        setClosed(true);
        queue().close();
    }

    public boolean isClosed() {
        return this.closed;
    }

    public void equalizeMessageDistribution() {
        try {
            queue().channel().basicQos(1);
        } catch (IOException e) {
            throw new MessageException("Cannot equalize distribution.", e);
        }
    }

    public void receiveAll(MessageListener messageListener) {
        receiveFor(messageListener);
    }

    public void receiveOnly(String[] strArr, MessageListener messageListener) {
        String[] strArr2 = strArr;
        if (strArr2 == null) {
            strArr2 = new String[0];
        }
        setMessageTypes(new HashSet(Arrays.asList(strArr2)));
        receiveFor(messageListener);
    }

    public String tag() {
        return this.tag;
    }

    protected MessageConsumer(Queue queue, boolean z) {
        setMessageTypes(new HashSet(Arrays.asList(new String[0])));
        setQueue(queue);
        setAutoAcknowledged(z);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isAutoAcknowledged() {
        return this.autoAcknowledged;
    }

    private void setAutoAcknowledged(boolean z) {
        this.autoAcknowledged = z;
    }

    private void setClosed(boolean z) {
        this.closed = z;
    }

    protected Queue queue() {
        return this.queue;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Set<String> messageTypes() {
        return this.messageTypes;
    }

    private void receiveFor(MessageListener messageListener) {
        Queue queue = queue();
        Channel channel = queue.channel();
        try {
            setTag(channel.basicConsume(queue.name(), isAutoAcknowledged(), new DispatchingConsumer(channel, messageListener)));
        } catch (IOException e) {
            throw new MessageException("Failed to initiate consumer.", e);
        }
    }

    private void setMessageTypes(Set<String> set) {
        this.messageTypes = set;
    }

    private void setQueue(Queue queue) {
        this.queue = queue;
    }

    private void setTag(String str) {
        this.tag = str;
    }
}
