package de.otto.synapse.endpoint.sender.kinesis;

import com.google.common.collect.Lists;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.endpoint.sender.AbstractMessageSenderEndpoint;
import de.otto.synapse.message.Message;
import de.otto.synapse.translator.MessageCodec;
import de.otto.synapse.translator.MessageFormat;
import de.otto.synapse.translator.MessageTranslator;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;

/* loaded from: input_file:de/otto/synapse/endpoint/sender/kinesis/KinesisMessageSender.class */
public class KinesisMessageSender extends AbstractMessageSenderEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisMessageSender.class);
    private static final int PUT_RECORDS_BATCH_SIZE = 500;
    private final KinesisAsyncClient kinesisAsyncClient;
    private final MessageFormat messageFormat;

    public KinesisMessageSender(String str, MessageInterceptorRegistry messageInterceptorRegistry, MessageTranslator<String> messageTranslator, KinesisAsyncClient kinesisAsyncClient) {
        this(str, messageInterceptorRegistry, messageTranslator, kinesisAsyncClient, MessageFormat.V1);
    }

    public KinesisMessageSender(String str, MessageInterceptorRegistry messageInterceptorRegistry, MessageTranslator<String> messageTranslator, KinesisAsyncClient kinesisAsyncClient, MessageFormat messageFormat) {
        super(str, messageInterceptorRegistry, messageTranslator);
        this.kinesisAsyncClient = kinesisAsyncClient;
        this.messageFormat = messageFormat;
    }

    protected CompletableFuture<Void> doSend(@Nonnull Message<String> message) {
        return CompletableFuture.allOf(this.kinesisAsyncClient.putRecords(createPutRecordRequest(message)));
    }

    protected CompletableFuture<Void> doSendBatch(@Nonnull Stream<Message<String>> stream) {
        return CompletableFuture.allOf((CompletableFuture[]) Lists.partition(createPutRecordRequestEntries(stream), PUT_RECORDS_BATCH_SIZE).stream().map(list -> {
            return this.kinesisAsyncClient.putRecords(createPutRecordsRequest(list));
        }).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    private PutRecordsRequest createPutRecordsRequest(List<PutRecordsRequestEntry> list) {
        return (PutRecordsRequest) PutRecordsRequest.builder().streamName(getChannelName()).records(list).build();
    }

    private ArrayList<PutRecordsRequestEntry> createPutRecordRequestEntries(@Nonnull Stream<Message<String>> stream) {
        return (ArrayList) stream.map(this::requestEntryFor).collect(Collectors.toCollection(ArrayList::new));
    }

    private PutRecordsRequest createPutRecordRequest(@Nonnull Message<String> message) {
        return (PutRecordsRequest) PutRecordsRequest.builder().streamName(getChannelName()).records(new PutRecordsRequestEntry[]{requestEntryFor(message)}).build();
    }

    private PutRecordsRequestEntry requestEntryFor(Message<String> message) {
        String encode = MessageCodec.encode(message, this.messageFormat);
        return (PutRecordsRequestEntry) PutRecordsRequestEntry.builder().partitionKey(message.getKey()).data(encode != null ? SdkBytes.fromString(encode, StandardCharsets.UTF_8) : SdkBytes.fromByteArray(new byte[0])).build();
    }
}
