package de.otto.synapse.endpoint.sender.kafka;

import com.google.common.collect.ImmutableList;
import de.otto.synapse.message.Key;
import de.otto.synapse.message.Message;
import de.otto.synapse.translator.Encoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.Utils;

/* loaded from: input_file:de/otto/synapse/endpoint/sender/kafka/KafkaEncoder.class */
public class KafkaEncoder implements Encoder<ProducerRecord<String, String>> {
    public static String COMPACTION_KEY = "_synapse_msg_compactionKey";
    public static String PARTITION_KEY = "_synapse_msg_partitionKey";
    private final String channelName;
    private final int numPartitions;

    public KafkaEncoder(String str, int i) {
        this.channelName = str;
        this.numPartitions = i;
    }

    public ProducerRecord<String, String> apply(Message<String> message) {
        Key key = message.getKey();
        return new ProducerRecord<>(this.channelName, key.isCompoundKey() ? Integer.valueOf(kafkaPartitionFrom(key.partitionKey())) : null, key.compactionKey(), (String) message.getPayload(), headersOf(message));
    }

    private List<Header> headersOf(Message<String> message) {
        ImmutableList.Builder builder = ImmutableList.builder();
        message.getHeader().getAll().forEach((str, str2) -> {
            builder.add(new RecordHeader(str, str2.getBytes(StandardCharsets.UTF_8)));
        });
        builder.add(new Header[]{new RecordHeader(PARTITION_KEY, message.getKey().partitionKey().getBytes(StandardCharsets.UTF_8)), new RecordHeader(COMPACTION_KEY, message.getKey().compactionKey().getBytes(StandardCharsets.UTF_8))});
        return builder.build();
    }

    private int kafkaPartitionFrom(String str) {
        return Utils.toPositive(Utils.murmur2(str.getBytes())) % this.numPartitions;
    }
}
