package de.otto.synapse.endpoint.receiver.sqs;

import com.google.common.collect.ImmutableMap;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.endpoint.receiver.AbstractMessageReceiverEndpoint;
import de.otto.synapse.endpoint.receiver.MessageQueueReceiverEndpoint;
import de.otto.synapse.message.Header;
import de.otto.synapse.message.Key;
import de.otto.synapse.message.TextMessage;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

/* loaded from: input_file:de/otto/synapse/endpoint/receiver/sqs/SqsMessageQueueReceiverEndpoint.class */
public class SqsMessageQueueReceiverEndpoint extends AbstractMessageReceiverEndpoint implements MessageQueueReceiverEndpoint {
    private static final int VISIBILITY_TIMEOUT = 5;
    private static final int WAIT_TIME_SECONDS = 2;
    private static final int STOP_TIMEOUT_SECONDS = 3;
    private static final String MSG_KEY_ATTR = "synapse_msg_key";

    @Nonnull
    private final SqsAsyncClient sqsAsyncClient;

    @Nonnull
    private final ExecutorService executorService;
    private final String queueUrl;
    private final AtomicBoolean stopSignal;
    private final CompletableFuture<Void> stopped;
    private static final Logger LOG = LoggerFactory.getLogger(SqsMessageQueueReceiverEndpoint.class);
    private static final MessageAttributeValue EMPTY_STRING_ATTR = (MessageAttributeValue) MessageAttributeValue.builder().dataType("String").stringValue("").build();

    public SqsMessageQueueReceiverEndpoint(@Nonnull String str, @Nonnull MessageInterceptorRegistry messageInterceptorRegistry, @Nonnull SqsAsyncClient sqsAsyncClient, @Nonnull ExecutorService executorService, @Nullable ApplicationEventPublisher applicationEventPublisher) {
        super(str, messageInterceptorRegistry, applicationEventPublisher);
        this.stopSignal = new AtomicBoolean(false);
        this.stopped = new CompletableFuture<>();
        this.sqsAsyncClient = sqsAsyncClient;
        this.executorService = executorService;
        try {
            this.queueUrl = ((GetQueueUrlResponse) sqsAsyncClient.getQueueUrl((GetQueueUrlRequest) GetQueueUrlRequest.builder().queueName(str).overrideConfiguration(AwsRequestOverrideConfiguration.builder().apiCallAttemptTimeout(Duration.ofMillis(2000L)).build()).build()).get()).queueUrl();
        } catch (Exception e) {
            this.stopped.complete(null);
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    public CompletableFuture<Void> consume() {
        return CompletableFuture.runAsync(() -> {
            do {
                try {
                    receiveAndProcess();
                } finally {
                    this.stopped.complete(null);
                }
            } while (!this.stopSignal.get());
        }, this.executorService);
    }

    private void receiveAndProcess() {
        try {
            LOG.debug("Sending receiveMessage request...");
            this.sqsAsyncClient.receiveMessage((ReceiveMessageRequest) ReceiveMessageRequest.builder().queueUrl(this.queueUrl).visibilityTimeout(Integer.valueOf(VISIBILITY_TIMEOUT)).messageAttributeNames(new String[]{".*"}).waitTimeSeconds(Integer.valueOf(WAIT_TIME_SECONDS)).build()).thenAccept(this::processResponse).join();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }
    }

    private void processResponse(ReceiveMessageResponse receiveMessageResponse) {
        if (receiveMessageResponse.messages() != null) {
            LOG.debug("Received {} messages from SQS.", Integer.valueOf(receiveMessageResponse.messages().size()));
            receiveMessageResponse.messages().forEach(this::processMessage);
        }
    }

    private void processMessage(Message message) {
        LOG.debug("Processing message from channel={}: messageId={} receiptHandle={}, messageAttributes={}", new Object[]{getChannelName(), message.messageId(), message.receiptHandle(), message.messageAttributes()});
        try {
            TextMessage intercept = intercept(TextMessage.of(messageKeyOf(message), Header.of((ShardPosition) null, messageAttributesOf(message)), message.body()));
            if (intercept != null) {
                LOG.debug("Dispatching message {} ", intercept);
                getMessageDispatcher().accept(intercept);
            }
            deleteMessage(message);
        } catch (Exception e) {
            LOG.error("Failed to process SQS message " + message, e);
        }
    }

    private Key messageKeyOf(Message message) {
        return message.messageAttributes() != null ? Key.of(((MessageAttributeValue) message.messageAttributes().getOrDefault("synapse_msg_key", EMPTY_STRING_ATTR)).stringValue()) : Key.NO_KEY;
    }

    private ImmutableMap<String, String> messageAttributesOf(Message message) {
        if (message.messageAttributes() == null) {
            return ImmutableMap.of();
        }
        ImmutableMap.Builder builder = ImmutableMap.builder();
        message.messageAttributes().entrySet().forEach(entry -> {
            String dataType = ((MessageAttributeValue) entry.getValue()).dataType();
            boolean z = -1;
            switch (dataType.hashCode()) {
                case -1808118735:
                    if (dataType.equals("String")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    builder.put(entry.getKey(), ((MessageAttributeValue) entry.getValue()).stringValue());
                    return;
                default:
                    LOG.warn("Ignoring messageAttribute {} with dataType {}: Not yet implemented this type.", entry.getKey(), ((MessageAttributeValue) entry.getValue()).dataType());
                    return;
            }
        });
        return builder.build();
    }

    private void deleteMessage(Message message) {
        try {
            LOG.debug("Deleting message with receiptHandle={}", message.receiptHandle());
            this.sqsAsyncClient.deleteMessage((DeleteMessageRequest) DeleteMessageRequest.builder().queueUrl(this.queueUrl).receiptHandle(message.receiptHandle()).build()).handle((deleteMessageResponse, th) -> {
                if (deleteMessageResponse != null) {
                    LOG.debug("Received DeleteMessageResponse={}", deleteMessageResponse);
                    return deleteMessageResponse;
                }
                LOG.info("Received exception while deleting message: " + th.getMessage());
                throw new RuntimeException(th);
            });
        } catch (RuntimeException e) {
            LOG.error("Error deleting message: " + e.getMessage(), e);
            throw e;
        }
    }

    public void stop() {
        LOG.info("Channel {} received stop signal.", getChannelName());
        this.stopSignal.set(true);
        try {
            this.stopped.thenAccept(r5 -> {
                LOG.info("SQS channel {} has been stopped", getChannelName());
            }).get(3L, TimeUnit.SECONDS);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }
}
