package de.otto.synapse.endpoint.receiver.kinesis;

import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.message.DefaultHeaderAttr;
import de.otto.synapse.message.Header;
import de.otto.synapse.message.Key;
import de.otto.synapse.message.TextMessage;
import de.otto.synapse.translator.AbstractTextDecoder;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.Record;

/* loaded from: input_file:de/otto/synapse/endpoint/receiver/kinesis/KinesisDecoder.class */
public class KinesisDecoder extends AbstractTextDecoder<RecordWithShard> {
    private static final SdkBytes EMPTY_SDK_BYTES_BUFFER = SdkBytes.fromByteArray(new byte[0]);
    private static final Function<SdkBytes, String> SDK_BYTES_STRING = sdkBytes -> {
        if (sdkBytes == null || sdkBytes.equals(EMPTY_SDK_BYTES_BUFFER)) {
            return null;
        }
        return sdkBytes.asString(StandardCharsets.UTF_8);
    };

    public TextMessage apply(RecordWithShard recordWithShard) {
        Record record = recordWithShard.getRecord();
        return decode(Key.of(record.partitionKey()), Header.builder().withAttribute(DefaultHeaderAttr.MSG_ARRIVAL_TS, record.approximateArrivalTimestamp()).withShardPosition(ShardPosition.fromPosition(recordWithShard.getShardName(), record.sequenceNumber())).build(), SDK_BYTES_STRING.apply(record.data()));
    }
}
