package de.otto.synapse.endpoint.sender.sqs;

import com.google.common.collect.ImmutableMap;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.endpoint.sender.AbstractMessageSenderEndpoint;
import de.otto.synapse.message.TextMessage;
import de.otto.synapse.translator.MessageTranslator;
import jakarta.annotation.Nonnull;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;

/* loaded from: input_file:de/otto/synapse/endpoint/sender/sqs/SqsMessageSender.class */
public class SqsMessageSender extends AbstractMessageSenderEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(SqsMessageSender.class);
    public static final String MSG_KEY_ATTR = "synapse_msg_key";
    private final String queueUrl;
    private final SqsAsyncClient sqsAsyncClient;

    public SqsMessageSender(String str, String str2, MessageInterceptorRegistry messageInterceptorRegistry, MessageTranslator<TextMessage> messageTranslator, SqsAsyncClient sqsAsyncClient) {
        super(str, messageInterceptorRegistry, messageTranslator);
        this.queueUrl = str2;
        this.sqsAsyncClient = sqsAsyncClient;
    }

    protected CompletableFuture<Void> doSend(@Nonnull TextMessage textMessage) {
        return CompletableFuture.allOf(this.sqsAsyncClient.sendMessage(toSendMessageRequest(textMessage)).whenComplete((BiConsumer) logResponse(textMessage)));
    }

    protected CompletableFuture<Void> doSendBatch(@Nonnull Stream<TextMessage> stream) {
        SendMessageBatchRequest sendMessageBatchRequest = toSendMessageBatchRequest(stream);
        return !sendMessageBatchRequest.entries().isEmpty() ? CompletableFuture.allOf(this.sqsAsyncClient.sendMessageBatch(sendMessageBatchRequest).whenComplete((BiConsumer) logBatchResponse())) : CompletableFuture.completedFuture(null);
    }

    private SendMessageBatchRequest toSendMessageBatchRequest(@Nonnull Stream<TextMessage> stream) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        return (SendMessageBatchRequest) SendMessageBatchRequest.builder().queueUrl(this.queueUrl).entries((Collection) stream.map(textMessage -> {
            return (SendMessageBatchRequestEntry) SendMessageBatchRequestEntry.builder().id(String.valueOf(atomicInteger.getAndIncrement())).messageAttributes(of(textMessage)).messageBody((String) textMessage.getPayload()).build();
        }).collect(Collectors.toList())).build();
    }

    private SendMessageRequest toSendMessageRequest(@Nonnull TextMessage textMessage) {
        return (SendMessageRequest) SendMessageRequest.builder().queueUrl(this.queueUrl).messageAttributes(of(textMessage)).messageBody((String) textMessage.getPayload()).build();
    }

    private ImmutableMap<String, MessageAttributeValue> of(@Nonnull TextMessage textMessage) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        textMessage.getHeader().getAll().entrySet().forEach(entry -> {
            builder.put((String) entry.getKey(), (MessageAttributeValue) MessageAttributeValue.builder().dataType("String").stringValue((String) entry.getValue()).build());
        });
        builder.put(MSG_KEY_ATTR, (MessageAttributeValue) MessageAttributeValue.builder().dataType("String").stringValue(textMessage.getKey().partitionKey()).build());
        return builder.build();
    }

    private BiConsumer<SendMessageResponse, Throwable> logResponse(@Nonnull TextMessage textMessage) {
        return (sendMessageResponse, th) -> {
            if (th != null) {
                LOG.error(String.format("Failed to send message %s", textMessage), th);
            }
            if (sendMessageResponse != null) {
                LOG.debug("Successfully sent message {}", sendMessageResponse);
            }
        };
    }

    private BiConsumer<SendMessageBatchResponse, Throwable> logBatchResponse() {
        return (sendMessageBatchResponse, th) -> {
            if (th != null) {
                LOG.error("Failed to send batch of messages: " + th.getMessage(), th);
            }
            if (sendMessageBatchResponse != null) {
                if (!sendMessageBatchResponse.successful().isEmpty()) {
                    LOG.debug("Successfully sent {} messages in a batch", Integer.valueOf(sendMessageBatchResponse.successful().size()));
                }
                if (sendMessageBatchResponse.failed().isEmpty()) {
                    return;
                }
                LOG.error("Failed to sent {} messages in a batch: {}", Integer.valueOf(sendMessageBatchResponse.failed().size()), sendMessageBatchResponse.failed());
            }
        };
    }
}
