package org.enodeframework.queue;

import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.enodeframework.commanding.CommandResult;
import org.enodeframework.commanding.CommandReturnType;
import org.enodeframework.common.SysProperties;
import org.enodeframework.common.serializing.JsonTool;
import org.enodeframework.common.utilities.Address;
import org.enodeframework.common.utilities.RemoteReply;
import org.enodeframework.common.utilities.RemotingUtil;
import org.enodeframework.queue.domainevent.DomainEventHandledMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/enodeframework/queue/SendReplyService.class */
public class SendReplyService {
    private static final Logger logger = LoggerFactory.getLogger(SendReplyService.class);
    private final ConcurrentHashMap<String, CompletableFuture<NetSocket>> socketMap = new ConcurrentHashMap<>();
    private boolean started;
    private boolean stoped;
    private NetClient netClient;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/enodeframework/queue/SendReplyService$SendReplyContext.class */
    public static class SendReplyContext {
        private final short replyType;
        private final Object replyData;
        private final String replyAddress;

        public SendReplyContext(short s, Object obj, String str) {
            this.replyType = s;
            this.replyData = obj;
            this.replyAddress = str;
        }

        public short getReplyType() {
            return this.replyType;
        }

        public Object getReplyData() {
            return this.replyData;
        }

        public String getReplyAddress() {
            return this.replyAddress;
        }
    }

    public void start() {
        if (this.started) {
            return;
        }
        this.netClient = Vertx.vertx(new VertxOptions()).createNetClient(new NetClientOptions());
        this.started = true;
    }

    public void stop() {
        if (this.stoped) {
            return;
        }
        this.netClient.close();
        this.stoped = true;
    }

    public CompletableFuture<Void> sendReply(short s, Object obj, String str) {
        SendReplyContext sendReplyContext = new SendReplyContext(s, obj, str);
        RemoteReply remoteReply = new RemoteReply();
        remoteReply.setCode(sendReplyContext.getReplyType());
        if (sendReplyContext.getReplyType() == CommandReturnType.CommandExecuted.getValue()) {
            remoteReply.setCommandResult((CommandResult) sendReplyContext.getReplyData());
        } else if (sendReplyContext.getReplyType() == CommandReturnType.EventHandled.getValue()) {
            remoteReply.setEventHandledMessage((DomainEventHandledMessage) sendReplyContext.getReplyData());
        }
        String str2 = JsonTool.serialize(remoteReply) + SysProperties.DELIMITED;
        Address string2Address = RemotingUtil.string2Address(str);
        SocketAddress inetSocketAddress = SocketAddress.inetSocketAddress(string2Address.getPort(), string2Address.getHost());
        CompletableFuture<NetSocket> completableFuture = new CompletableFuture<>();
        if (this.socketMap.putIfAbsent(str, completableFuture) == null) {
            this.netClient.connect(inetSocketAddress, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    completableFuture.completeExceptionally(asyncResult.cause());
                    logger.error("Failed to connect NetServer", asyncResult.cause());
                } else {
                    NetSocket netSocket = (NetSocket) asyncResult.result();
                    netSocket.endHandler(r3 -> {
                        netSocket.close();
                    }).exceptionHandler(th -> {
                        this.socketMap.remove(str);
                        logger.error("NetSocket occurs unexpected error", th);
                        netSocket.close();
                    }).handler(buffer -> {
                        logger.info("NetClient receiving: {}", buffer.toString("UTF-8"));
                    }).closeHandler(r6 -> {
                        this.socketMap.remove(str);
                        logger.info("NetClient socket closed: {}", str);
                    });
                    completableFuture.complete(netSocket);
                }
            });
        }
        return this.socketMap.get(str).thenAccept(netSocket -> {
            netSocket.write(str2);
        }).exceptionally(th -> {
            logger.error("Send command reply has exception, replyAddress: {}", sendReplyContext.getReplyAddress(), th);
            return null;
        });
    }
}
