package org.awsutils.sqs.client;

import com.google.common.collect.ImmutableMap;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.awsutils.sqs.exceptions.UtilsException;
import org.awsutils.sqs.message.SqsMessage;
import org.awsutils.sqs.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;

/* loaded from: input_file:org/awsutils/sqs/client/SqsMessageClientImpl.class */
public class SqsMessageClientImpl implements SqsMessageClient {
    private static final Logger log = LoggerFactory.getLogger(SqsMessageClientImpl.class);
    private final SqsAsyncClient sqsAsyncClient;
    private final ConcurrentHashMap<String, String> queueUrlMap = new ConcurrentHashMap<>();

    public SqsMessageClientImpl(SqsAsyncClient sqsAsyncClient) {
        this.sqsAsyncClient = sqsAsyncClient;
    }

    @Override // org.awsutils.sqs.client.SqsMessageClient
    public <T> CompletableFuture<SendMessageResponse> sendMessage(T t, String str, String str2, String str3, Integer num, Map<String, String> map) {
        String constructJson = t instanceof String ? (String) t : Utils.constructJson(t);
        String queueUrl = getQueueUrl(str3);
        SendMessageRequest.Builder queueUrl2 = SendMessageRequest.builder().messageBody(constructJson).delaySeconds(num).queueUrl(queueUrl);
        HashMap hashMap = !CollectionUtils.isEmpty(map) ? new HashMap(map) : new HashMap();
        hashMap.put(MessageConstants.SQS_MESSAGE_WRAPPER_PRESENT, "false");
        queueUrl2.messageAttributes(getSqsMessageAttributeValues(str, str2, getSqsMessageAttributes(hashMap)));
        if (log.isInfoEnabled()) {
            log.info(MessageFormat.format("Sending message to SQS [{0}]: {1}", queueUrl, t));
        }
        return this.sqsAsyncClient.sendMessage((SendMessageRequest) queueUrl2.build()).thenApplyAsync(sendMessageResponse -> {
            return handleSqsResponse(t, str, str3, num, sendMessageResponse);
        });
    }

    @Override // org.awsutils.sqs.client.SqsMessageClient
    public <T> CompletableFuture<SendMessageResponse> sendMessage(SqsMessage<T> sqsMessage, String str, Integer num, Map<String, String> map) {
        String constructJson = Utils.constructJson(sqsMessage);
        String queueUrl = getQueueUrl(str);
        SendMessageRequest.Builder queueUrl2 = SendMessageRequest.builder().messageBody(constructJson).delaySeconds(num).queueUrl(queueUrl);
        HashMap hashMap = !CollectionUtils.isEmpty(map) ? new HashMap(map) : new HashMap();
        hashMap.put(MessageConstants.SQS_MESSAGE_WRAPPER_PRESENT, "true");
        queueUrl2.messageAttributes(getSqsMessageAttributeValues(sqsMessage, getSqsMessageAttributes(hashMap)));
        if (log.isInfoEnabled()) {
            log.info(MessageFormat.format("Sending message to SQS [{0}]: {1}", queueUrl, sqsMessage));
        }
        return this.sqsAsyncClient.sendMessage((SendMessageRequest) queueUrl2.build()).thenApplyAsync(sendMessageResponse -> {
            return handleSqsResponse(sqsMessage, str, num, sendMessageResponse);
        });
    }

    private static <T> Map<String, String> constructFinalMessageAttributeMap(SqsMessage<T> sqsMessage, Map<String, String> map) {
        HashMap hashMap = !CollectionUtils.isEmpty(map) ? new HashMap(map) : new HashMap();
        hashMap.put(MessageConstants.SQS_MESSAGE_WRAPPER_PRESENT, "true");
        return hashMap;
    }

    private static <T> Map<String, String> constructFinalMessageAttributeMap(String str, String str2, Map<String, String> map) {
        HashMap hashMap = !CollectionUtils.isEmpty(map) ? new HashMap(map) : new HashMap();
        hashMap.put(MessageConstants.SQS_MESSAGE_WRAPPER_PRESENT, "false");
        return hashMap;
    }

    @Override // org.awsutils.sqs.client.SqsMessageClient
    public <T> CompletableFuture<SendMessageBatchResponse> sendMessage(List<T> list, String str, String str2, String str3, Integer num, Map<String, String> map) {
        if (CollectionUtils.isEmpty(list) && list.size() > 10) {
            log.error(CollectionUtils.isEmpty(list) ? "At least one message needs to be sent" : "Maximum number of messages supported is 10");
            throw new IllegalArgumentException(CollectionUtils.isEmpty(list) ? "At least one message needs to be sent" : "Maximum number of messages supported is 10");
        }
        Map<String, MessageAttributeValue> sqsMessageAttributes = getSqsMessageAttributes(constructFinalMessageAttributeMap(str2, str, map));
        String queueUrl = getQueueUrl(str3);
        SendMessageBatchRequest sendMessageBatchRequest = (SendMessageBatchRequest) SendMessageBatchRequest.builder().entries((Collection) list.stream().map(obj -> {
            return (SendMessageBatchRequestEntry) SendMessageBatchRequestEntry.builder().id(StringUtils.hasLength(str2) ? str2 : UUID.randomUUID().toString()).messageBody(Utils.constructJson(obj)).delaySeconds(num).messageAttributes(getSqsMessageAttributeValues(str, str2, sqsMessageAttributes)).build();
        }).collect(Collectors.toList())).queueUrl(queueUrl).build();
        if (log.isDebugEnabled()) {
            log.debug(MessageFormat.format("Sending messages to SQS[{0}] : {1}", queueUrl, list));
        }
        return this.sqsAsyncClient.sendMessageBatch(sendMessageBatchRequest);
    }

    @Override // org.awsutils.sqs.client.SqsMessageClient
    public <T> CompletableFuture<SendMessageBatchResponse> sendMessage(List<SqsMessage<T>> list, String str, Integer num, Map<String, String> map) {
        if (CollectionUtils.isEmpty(list) && list.size() > 10) {
            log.error(CollectionUtils.isEmpty(list) ? "At least one message needs to be sent" : "Maximum number of messages supported is 10");
            throw new IllegalArgumentException(CollectionUtils.isEmpty(list) ? "At least one message needs to be sent" : "Maximum number of messages supported is 10");
        }
        Map<String, MessageAttributeValue> sqsMessageAttributes = getSqsMessageAttributes(constructFinalMessageAttributeMap(list.get(0), map));
        String queueUrl = getQueueUrl(str);
        Set set = (Set) list.stream().filter(sqsMessage -> {
            return !StringUtils.isEmpty(sqsMessage.getTransactionId());
        }).map((v0) -> {
            return v0.getTransactionId();
        }).collect(Collectors.toSet());
        boolean z = !CollectionUtils.isEmpty(set) && set.size() == list.size();
        SendMessageBatchRequest sendMessageBatchRequest = (SendMessageBatchRequest) SendMessageBatchRequest.builder().entries((Collection) list.stream().map(sqsMessage2 -> {
            return (SendMessageBatchRequestEntry) SendMessageBatchRequestEntry.builder().id(z ? sqsMessage2.getTransactionId() : UUID.randomUUID().toString()).messageBody(Utils.constructJson(sqsMessage2)).delaySeconds(num).messageAttributes(getSqsMessageAttributeValues(sqsMessage2, sqsMessageAttributes)).build();
        }).collect(Collectors.toList())).queueUrl(queueUrl).build();
        if (log.isDebugEnabled()) {
            log.debug(MessageFormat.format("Sending messages to SQS[{0}] : {1}", queueUrl, list));
        }
        return this.sqsAsyncClient.sendMessageBatch(sendMessageBatchRequest);
    }

    @Override // org.awsutils.sqs.client.SqsMessageClient
    public String getQueueUrl(String str) {
        return this.queueUrlMap.computeIfAbsent(str, str2 -> {
            return queueUrl(str);
        });
    }

    @Override // org.awsutils.sqs.client.SqsMessageClient
    public CompletableFuture<DeleteMessageResponse> deleteMessage(String str, String str2) {
        DeleteMessageRequest deleteMessageRequest = (DeleteMessageRequest) DeleteMessageRequest.builder().queueUrl(str).receiptHandle(str2).build();
        if (log.isDebugEnabled()) {
            log.debug(MessageFormat.format("Deleting message  from Queue: {0} with receiptHandle: {1}", str, str2));
        }
        return this.sqsAsyncClient.deleteMessage(deleteMessageRequest);
    }

    @Override // org.awsutils.sqs.client.SqsMessageClient
    public CompletableFuture<ChangeMessageVisibilityResponse> changeVisibility(String str, String str2, Integer num) {
        ChangeMessageVisibilityRequest changeMessageVisibilityRequest = (ChangeMessageVisibilityRequest) ChangeMessageVisibilityRequest.builder().queueUrl(str).receiptHandle(str2).visibilityTimeout(num).build();
        if (log.isDebugEnabled()) {
            log.debug(MessageFormat.format("Changing visibility of [{0}] from queue: {1}", str2, str));
        }
        return this.sqsAsyncClient.changeMessageVisibility(changeMessageVisibilityRequest);
    }

    private Map<String, MessageAttributeValue> getSqsMessageAttributes(Map<String, String> map) {
        return !CollectionUtils.isEmpty(map) ? ImmutableMap.copyOf((Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return (MessageAttributeValue) MessageAttributeValue.builder().dataType(MessageConstants.MESSAGE_ATTRIBUTE_TYPE).stringValue((String) entry.getValue()).build();
        }))) : Collections.emptyMap();
    }

    private <T> Map<String, MessageAttributeValue> getSqsMessageAttributeValues(SqsMessage<T> sqsMessage, Map<String, MessageAttributeValue> map) {
        ImmutableMap.Builder putAll = ImmutableMap.builder().putAll(map);
        if (!StringUtils.isEmpty(sqsMessage.getTransactionId())) {
            putAll.put(MessageConstants.TRANSACTION_ID, (MessageAttributeValue) MessageAttributeValue.builder().dataType(MessageConstants.MESSAGE_ATTRIBUTE_TYPE).stringValue(sqsMessage.getTransactionId()).build());
        }
        putAll.put(MessageConstants.MESSAGE_TYPE, (MessageAttributeValue) MessageAttributeValue.builder().dataType(MessageConstants.MESSAGE_ATTRIBUTE_TYPE).stringValue(sqsMessage.getMessageType()).build());
        return putAll.build();
    }

    private <T> Map<String, MessageAttributeValue> getSqsMessageAttributeValues(String str, String str2, Map<String, MessageAttributeValue> map) {
        ImmutableMap.Builder putAll = ImmutableMap.builder().putAll(map);
        if (!StringUtils.isEmpty(str2)) {
            putAll.put(MessageConstants.TRANSACTION_ID, (MessageAttributeValue) MessageAttributeValue.builder().dataType(MessageConstants.MESSAGE_ATTRIBUTE_TYPE).stringValue(str2).build());
        }
        putAll.put(MessageConstants.MESSAGE_TYPE, (MessageAttributeValue) MessageAttributeValue.builder().dataType(MessageConstants.MESSAGE_ATTRIBUTE_TYPE).stringValue(str).build());
        return putAll.build();
    }

    private String queueUrl(String str) {
        try {
            return ((GetQueueUrlResponse) this.sqsAsyncClient.getQueueUrl((GetQueueUrlRequest) GetQueueUrlRequest.builder().queueName(str).build()).get()).queueUrl();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new UtilsException("UNKNOWN_ERROR", e);
        } catch (ExecutionException e2) {
            log.error("Exception while getting queueUrl [ " + str + "]: " + e2, e2.getCause());
            throw new UtilsException("UNKNOWN_ERROR", MessageFormat.format("Exception while getting queueUrl [ {0}]: ", str), e2.getCause());
        }
    }

    private <T> SendMessageResponse handleSqsResponse(SqsMessage<T> sqsMessage, String str, Integer num, SendMessageResponse sendMessageResponse) {
        if (log.isDebugEnabled()) {
            log.debug(MessageFormat.format("Sent message to {0}, message type: {1} w/ delay {2}", str, sqsMessage.getMessageType(), num));
        }
        return sendMessageResponse;
    }

    private <T> SendMessageResponse handleSqsResponse(T t, String str, String str2, Integer num, SendMessageResponse sendMessageResponse) {
        if (log.isDebugEnabled()) {
            log.debug(MessageFormat.format("Sent message to {0}, message type: {1} w/ delay {2}", str2, str, num));
        }
        return sendMessageResponse;
    }
}
