package io.axonif.queuebacca.sqs;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.axonif.queuebacca.Client;
import io.axonif.queuebacca.IncomingEnvelope;
import io.axonif.queuebacca.Message;
import io.axonif.queuebacca.MessageBin;
import io.axonif.queuebacca.OutgoingEnvelope;
import io.axonif.queuebacca.util.MessageSerializer;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axonif/queuebacca/sqs/SqsClient.class */
public final class SqsClient implements Client {
    private static final ScheduledExecutorService REFRESH_SCHEDULER = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("queuebacca-sqs-refresher").build());
    private static final String APPROXIMATE_RECEIVE_COUNT_ATTRIBUTE = "ApproximateReceiveCount";
    private static final String APPROXIMATE_FIRST_RECEIVE_TIMESTAMP_ATTRIBUTE = "ApproximateFirstReceiveTimestamp";
    public static final int DEFAULT_SQS_CONNECTIONS = 200;
    public static final String DEFAULT_SQS_CONNECTIONS_PROPERTY = "maxConnections";
    public static final int MAX_READ_COUNT = 10;
    private final MessageRefresher refresher = new MessageRefresher();
    private final AmazonSQS client;
    private final MessageSerializer serializer;
    private final SqsCourierMessageBinRegistry messageBinRegistry;

    /* loaded from: input_file:io/axonif/queuebacca/sqs/SqsClient$MessageRefresher.class */
    private class MessageRefresher {
        private final Map<IncomingEnvelope, Future<?>> futures;

        private MessageRefresher() {
            this.futures = new ConcurrentHashMap();
        }

        void scheduleRefresh(IncomingEnvelope<?> incomingEnvelope, String str, Duration duration, Logger logger) {
            this.futures.put(incomingEnvelope, SqsClient.REFRESH_SCHEDULER.schedule(() -> {
                refreshMessages(incomingEnvelope, str, duration, logger);
            }, (duration.toMinutes() < 2 ? duration.dividedBy(2L) : duration.minusMinutes(1L)).toMillis(), TimeUnit.MILLISECONDS));
        }

        void cancelRefresh(IncomingEnvelope<?> incomingEnvelope) {
            this.futures.remove(incomingEnvelope).cancel(true);
        }

        private void refreshMessages(IncomingEnvelope<?> incomingEnvelope, String str, Duration duration, Logger logger) {
            logger.info("Refreshing SQS message '{}' for '{}' seconds", incomingEnvelope.getMessageId(), Long.valueOf(duration.getSeconds()));
            SqsClient.this.client.changeMessageVisibility(new ChangeMessageVisibilityRequest().withQueueUrl(str).withReceiptHandle(incomingEnvelope.getReceipt()).withVisibilityTimeout(Integer.valueOf((int) duration.getSeconds())));
            scheduleRefresh(incomingEnvelope, str, duration, logger);
        }
    }

    public SqsClient(AmazonSQS amazonSQS, MessageSerializer messageSerializer, SqsCourierMessageBinRegistry sqsCourierMessageBinRegistry) {
        this.client = (AmazonSQS) Objects.requireNonNull(amazonSQS);
        this.serializer = (MessageSerializer) Objects.requireNonNull(messageSerializer);
        this.messageBinRegistry = (SqsCourierMessageBinRegistry) Objects.requireNonNull(sqsCourierMessageBinRegistry);
    }

    public <M extends Message> OutgoingEnvelope<M> sendMessage(MessageBin messageBin, M m, int i) {
        Objects.requireNonNull(messageBin);
        Objects.requireNonNull(m);
        String messageSerializer = this.serializer.toString(m);
        String messageId = this.client.sendMessage(new SendMessageRequest().withQueueUrl(this.messageBinRegistry.getQueueUrl(messageBin)).withMessageBody(messageSerializer).withDelaySeconds(Integer.valueOf(i))).getMessageId();
        LoggerFactory.getLogger(messageBin.getName()).info("Sent SQS message '{}'", messageId);
        return new OutgoingEnvelope<>(messageId, m, messageSerializer);
    }

    public <M extends Message> Collection<OutgoingEnvelope<M>> sendMessages(MessageBin messageBin, Collection<M> collection, int i) {
        Objects.requireNonNull(messageBin);
        Objects.requireNonNull(collection);
        if (collection.isEmpty()) {
            return Collections.emptyList();
        }
        SqsMessageBatchSender sqsMessageBatchSender = new SqsMessageBatchSender(this.client, this.serializer, LoggerFactory.getLogger(messageBin.getName()));
        Objects.requireNonNull(sqsMessageBatchSender);
        collection.forEach(sqsMessageBatchSender::add);
        return sqsMessageBatchSender.send(this.messageBinRegistry.getQueueUrl(messageBin), i);
    }

    public <M extends Message> Collection<IncomingEnvelope<M>> retrieveMessages(MessageBin messageBin, int i) {
        ReceiveMessageRequest withAttributeNames = new ReceiveMessageRequest().withQueueUrl(this.messageBinRegistry.getQueueUrl(messageBin)).withMaxNumberOfMessages(Integer.valueOf(Math.min(10, i))).withWaitTimeSeconds(20).withAttributeNames(new String[]{APPROXIMATE_RECEIVE_COUNT_ATTRIBUTE, APPROXIMATE_FIRST_RECEIVE_TIMESTAMP_ATTRIBUTE});
        ArrayList arrayList = new ArrayList();
        for (com.amazonaws.services.sqs.model.Message message : this.client.receiveMessage(withAttributeNames).getMessages()) {
            LoggerFactory.getLogger(messageBin.getName()).info("Received SQS message '{}'", message.getMessageId());
            arrayList.add(mapSqsMessage(message));
        }
        arrayList.forEach(incomingEnvelope -> {
            this.refresher.scheduleRefresh(incomingEnvelope, this.messageBinRegistry.getQueueUrl(messageBin), this.messageBinRegistry.getVisibilityTimeout(messageBin), LoggerFactory.getLogger(messageBin.getName()));
        });
        return arrayList;
    }

    public void returnMessage(MessageBin messageBin, IncomingEnvelope<?> incomingEnvelope, int i) {
        Objects.requireNonNull(messageBin);
        Objects.requireNonNull(incomingEnvelope);
        try {
            this.client.changeMessageVisibility(new ChangeMessageVisibilityRequest().withQueueUrl(this.messageBinRegistry.getQueueUrl(messageBin)).withReceiptHandle((String) Objects.requireNonNull(incomingEnvelope.getReceipt())).withVisibilityTimeout(Integer.valueOf(i)));
            this.refresher.cancelRefresh(incomingEnvelope);
        } catch (Throwable th) {
            this.refresher.cancelRefresh(incomingEnvelope);
            throw th;
        }
    }

    public void disposeMessage(MessageBin messageBin, IncomingEnvelope<?> incomingEnvelope) {
        try {
            this.client.deleteMessage(new DeleteMessageRequest().withQueueUrl(this.messageBinRegistry.getQueueUrl(messageBin)).withReceiptHandle((String) Objects.requireNonNull(incomingEnvelope.getReceipt())));
            this.refresher.cancelRefresh(incomingEnvelope);
        } catch (Throwable th) {
            this.refresher.cancelRefresh(incomingEnvelope);
            throw th;
        }
    }

    private <M extends Message> IncomingEnvelope<M> mapSqsMessage(com.amazonaws.services.sqs.model.Message message) {
        return new IncomingEnvelope<>(message.getMessageId(), message.getReceiptHandle(), Integer.valueOf((String) message.getAttributes().get(APPROXIMATE_RECEIVE_COUNT_ATTRIBUTE)).intValue(), Instant.ofEpochMilli(Long.valueOf((String) message.getAttributes().get(APPROXIMATE_FIRST_RECEIVE_TIMESTAMP_ATTRIBUTE)).longValue()), (Message) this.serializer.fromString(message.getBody(), Message.class), message.getBody());
    }

    static {
        Runtime runtime = Runtime.getRuntime();
        ScheduledExecutorService scheduledExecutorService = REFRESH_SCHEDULER;
        Objects.requireNonNull(scheduledExecutorService);
        runtime.addShutdownHook(new Thread(scheduledExecutorService::shutdownNow));
    }
}
