package io.axonif.queuebacca.sqs;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SendMessageBatchResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;
import com.google.common.collect.Iterables;
import io.axonif.queuebacca.Message;
import io.axonif.queuebacca.OutgoingEnvelope;
import io.axonif.queuebacca.util.MessageSerializer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;

/* loaded from: input_file:io/axonif/queuebacca/sqs/SqsMessageBatchSender.class */
final class SqsMessageBatchSender<M extends Message> {
    public static final int MAX_BATCH_ENTRY_COUNT = 10;
    private final Collection<BatchEntry<M>> batchEntries = new ArrayList();
    private final AmazonSQS client;
    private final MessageSerializer serializer;
    private final Logger logger;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/axonif/queuebacca/sqs/SqsMessageBatchSender$Batch.class */
    public static class Batch<M extends Message> {
        private final Map<String, BatchEntry<M>> entries;

        private Batch(Map<String, BatchEntry<M>> map) {
            this.entries = (Map) Objects.requireNonNull(map);
        }

        public static <M extends Message> Batch<M> from(Collection<BatchEntry<M>> collection) {
            String replace = UUID.randomUUID().toString().replace("-", "");
            AtomicInteger atomicInteger = new AtomicInteger(1);
            return new Batch<>((Map) collection.stream().collect(Collectors.toMap(batchEntry -> {
                return replace + "_" + atomicInteger.getAndIncrement();
            }, Function.identity())));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public M getMessage(String str) {
            return (M) this.entries.get(str).getMessage();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getMessageBody(String str) {
            return this.entries.get(str).getMessageBody();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public SendMessageBatchRequest toBatchRequest(int i) {
            return new SendMessageBatchRequest().withEntries((Collection) this.entries.entrySet().stream().map(entry -> {
                return new SendMessageBatchRequestEntry((String) entry.getKey(), ((BatchEntry) entry.getValue()).getMessageBody()).withDelaySeconds(Integer.valueOf(i));
            }).collect(Collectors.toList()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/axonif/queuebacca/sqs/SqsMessageBatchSender$BatchEntry.class */
    public static class BatchEntry<M extends Message> {
        private final M message;
        private final String messageBody;

        private BatchEntry(M m, String str) {
            this.message = m;
            this.messageBody = str;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public M getMessage() {
            return this.message;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getMessageBody() {
            return this.messageBody;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SqsMessageBatchSender(AmazonSQS amazonSQS, MessageSerializer messageSerializer, Logger logger) {
        this.client = (AmazonSQS) Objects.requireNonNull(amazonSQS);
        this.serializer = (MessageSerializer) Objects.requireNonNull(messageSerializer);
        this.logger = (Logger) Objects.requireNonNull(logger);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void add(M m) {
        Objects.requireNonNull(m);
        this.batchEntries.add(new BatchEntry<>(m, this.serializer.toString(m)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<OutgoingEnvelope<M>> send(String str, int i) {
        return (Collection) StreamSupport.stream(Iterables.partition(this.batchEntries, 10).spliterator(), false).map((v0) -> {
            return Batch.from(v0);
        }).map(batch -> {
            return sendBatch(batch, str, i);
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    private Collection<OutgoingEnvelope<M>> sendBatch(Batch<M> batch, String str, int i) {
        SendMessageBatchRequest withQueueUrl = batch.toBatchRequest(i).withQueueUrl(str);
        SendMessageBatchResult sendMessageBatch = this.client.sendMessageBatch(withQueueUrl);
        Collection<OutgoingEnvelope<M>> collection = (Collection) sendMessageBatch.getSuccessful().stream().peek(sendMessageBatchResultEntry -> {
            this.logger.info("Sent SQS message '{}'", sendMessageBatchResultEntry.getMessageId());
        }).map(sendMessageBatchResultEntry2 -> {
            return new OutgoingEnvelope(sendMessageBatchResultEntry2.getMessageId(), batch.getMessage(sendMessageBatchResultEntry2.getId()), batch.getMessageBody(sendMessageBatchResultEntry2.getId()));
        }).collect(Collectors.toList());
        sendMessageBatch.getFailed().forEach(batchResultErrorEntry -> {
            this.logger.warn("Batch entry '{}' failed: [{}] {}; retrying", new Object[]{batchResultErrorEntry.getId(), batchResultErrorEntry.getCode(), batchResultErrorEntry.getMessage()});
            withQueueUrl.getEntries().stream().filter(sendMessageBatchRequestEntry -> {
                return Objects.equals(sendMessageBatchRequestEntry.getId(), batchResultErrorEntry.getId());
            }).findFirst().ifPresent(sendMessageBatchRequestEntry2 -> {
                SendMessageResult sendMessage = this.client.sendMessage(new SendMessageRequest(str, sendMessageBatchRequestEntry2.getMessageBody()));
                this.logger.info("Sent SQS message '{}'", sendMessage.getMessageId());
                collection.add(new OutgoingEnvelope(sendMessage.getMessageId(), batch.getMessage(sendMessageBatchRequestEntry2.getId()), batch.getMessageBody(sendMessageBatchRequestEntry2.getId())));
            });
        });
        return collection;
    }
}
