package org.enodeframework.kafka;

import java.util.concurrent.CompletableFuture;
import org.enodeframework.commanding.CommandOptions;
import org.enodeframework.common.serializing.SerializeService;
import org.enodeframework.messaging.ReplyMessage;
import org.enodeframework.queue.MessageTypeCode;
import org.enodeframework.queue.QueueMessage;
import org.enodeframework.queue.SendMessageResult;
import org.enodeframework.queue.SendReplyService;
import org.enodeframework.queue.reply.GenericReplyMessage;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:org/enodeframework/kafka/KafkaSendReplyService.class */
public class KafkaSendReplyService implements SendReplyService {
    private final CommandOptions commandOptions;
    private final SerializeService serializeService;
    private final KafkaProducerHolder kafkaProducerHolder;

    public KafkaSendReplyService(KafkaProducerHolder kafkaProducerHolder, CommandOptions commandOptions, SerializeService serializeService) {
        this.kafkaProducerHolder = kafkaProducerHolder;
        this.commandOptions = commandOptions;
        this.serializeService = serializeService;
    }

    @NotNull
    public CompletableFuture<SendMessageResult> send(@NotNull ReplyMessage replyMessage) {
        this.kafkaProducerHolder.send(buildQueueMessage(replyMessage));
        return CompletableFuture.completedFuture(new SendMessageResult(""));
    }

    private QueueMessage buildQueueMessage(ReplyMessage replyMessage) {
        GenericReplyMessage asGenericReplyMessage = replyMessage.asGenericReplyMessage();
        QueueMessage asPartQueueMessage = replyMessage.asPartQueueMessage();
        asPartQueueMessage.setTopic(this.commandOptions.replyWith(replyMessage.getAddress()));
        asPartQueueMessage.setTag(replyMessage.getAddress());
        asPartQueueMessage.setBody(this.serializeService.serializeBytes(asGenericReplyMessage));
        asPartQueueMessage.setType(MessageTypeCode.ReplyMessage.getValue());
        return asPartQueueMessage;
    }
}
