package io.interact.sqsdw;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/interact/sqsdw/SqsListenerImpl.class */
public class SqsListenerImpl implements SqsListener {
    private static final int SLEEP_ON_ERROR = 5000;
    private static final Logger LOG = LoggerFactory.getLogger(SqsListenerImpl.class);
    private static final String ATTR_ALL = "All";
    private final AtomicBoolean healthy = new AtomicBoolean(true);
    private final AmazonSQS sqs;
    private final String sqsListenQueueUrl;
    private final Set<MessageHandler> handlers;
    private final String interruptedMsg;
    private Thread pollingThread;

    @Inject
    public SqsListenerImpl(AmazonSQS amazonSQS, @Named("sqsListenQueueUrl") String str, Set<MessageHandler> set) {
        this.sqs = amazonSQS;
        this.sqsListenQueueUrl = str;
        this.handlers = set;
        this.interruptedMsg = "Stop listening to queue: " + str;
    }

    public void start() throws Exception {
        this.pollingThread = new Thread() { // from class: io.interact.sqsdw.SqsListenerImpl.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                SqsListenerImpl.LOG.info("Start listening to queue: " + SqsListenerImpl.this.sqsListenQueueUrl);
                while (!isInterrupted()) {
                    try {
                        List messages = SqsListenerImpl.this.sqs.receiveMessage(new ReceiveMessageRequest(SqsListenerImpl.this.sqsListenQueueUrl).withMessageAttributeNames(new String[]{SqsListenerImpl.ATTR_ALL})).getMessages();
                        for (int i = 0; i < messages.size(); i++) {
                            Message message = (Message) messages.get(i);
                            SqsListenerImpl.LOG.debug(String.format("Processing message %s of %s...", Integer.valueOf(i + 1), Integer.valueOf(messages.size())));
                            try {
                                for (MessageHandler messageHandler : SqsListenerImpl.this.handlers) {
                                    SqsListenerImpl.LOG.debug("Calling message handler: " + messageHandler);
                                    if (messageHandler.canHandle(message)) {
                                        SqsListenerImpl.LOG.debug("Message accepted.");
                                        messageHandler.handle(message);
                                    } else {
                                        SqsListenerImpl.LOG.debug("Message refused.");
                                    }
                                }
                            } catch (Exception e) {
                                SqsListenerImpl.this.logProcessingError(message, e);
                            }
                            SqsListenerImpl.this.sqs.deleteMessage(new DeleteMessageRequest(SqsListenerImpl.this.sqsListenQueueUrl, message.getReceiptHandle()));
                            SqsListenerImpl.LOG.debug(String.format("Message %s of %s is processed and deleted from queue '%s'", Integer.valueOf(i + 1), Integer.valueOf(messages.size()), SqsListenerImpl.this.sqsListenQueueUrl));
                        }
                        if (SqsListenerImpl.this.healthy.compareAndSet(false, true)) {
                            SqsListenerImpl.LOG.info(String.format("Queue '%s' recovered from error condition", SqsListenerImpl.this.sqsListenQueueUrl));
                        }
                    } catch (Exception e2) {
                        SqsListenerImpl.this.handleQueueError(e2);
                    }
                }
                SqsListenerImpl.LOG.info(SqsListenerImpl.this.interruptedMsg);
            }
        };
        this.pollingThread.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void logProcessingError(Message message, Exception exc) {
        StringBuilder append = new StringBuilder().append("An error occurred while processing the following message:").append("\n\tMessageId:     ").append(message.getMessageId()).append("\n\tReceiptHandle: ").append(message.getReceiptHandle()).append("\n\tMD5OfBody:     ").append(message.getMD5OfBody()).append("\n\tBody:          ").append(message.getBody());
        for (Map.Entry entry : message.getMessageAttributes().entrySet()) {
            append.append("\n\tAttribute\n\t\tName:  " + ((String) entry.getKey()) + "\n\t\tValue: " + entry.getValue());
        }
        LOG.error(append.toString(), exc);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleQueueError(Exception exc) {
        LOG.error(String.format(this.healthy.compareAndSet(true, false) ? "An error occurred while listening to '%s', waiting '%s' ms before retrying..." : "Retry failed while listening to '%s', waiting '%s' ms before retrying...", this.sqsListenQueueUrl, Integer.valueOf(SLEEP_ON_ERROR)), exc);
        try {
            Thread.sleep(5000L);
        } catch (InterruptedException e) {
            LOG.info(this.interruptedMsg);
        }
    }

    public void stop() throws Exception {
        this.pollingThread.interrupt();
    }

    @Override // io.interact.sqsdw.SqsListener
    public boolean isHealthy() {
        return this.healthy.get();
    }

    @Override // io.interact.sqsdw.SqsListener
    public String getQueueUrl() {
        return this.sqsListenQueueUrl;
    }
}
