package org.enodeframework.queue;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.streams.WriteStream;
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameHelper;
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameParser;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import kotlin.Metadata;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.StringCompanionObject;
import org.enodeframework.commanding.CommandResult;
import org.enodeframework.commanding.CommandReturnType;
import org.enodeframework.common.io.Task;
import org.enodeframework.common.remoting.ReplyMessage;
import org.enodeframework.common.remoting.ReplySocketAddress;
import org.enodeframework.common.serializing.ISerializeService;
import org.enodeframework.common.utils.ReplyUtil;
import org.enodeframework.queue.domainevent.DomainEventHandledMessage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: DefaultSendReplyService.kt */
@Metadata(mv = {1, 4, 2}, bv = {1, 0, 3}, k = 1, d1 = {"��d\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000b\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n\u0002\b\u0003\u0018�� !2\u00020\u00012\u00020\u0002:\u0001!B\r\u0012\u0006\u0010\u0003\u001a\u00020\u0004¢\u0006\u0002\u0010\u0005J\u001e\u0010\u0011\u001a\b\u0012\u0004\u0012\u00020\u000f0\u00122\u0006\u0010\u0013\u001a\u00020\u00142\u0006\u0010\u0015\u001a\u00020\u0016H\u0016J\u001e\u0010\u0017\u001a\b\u0012\u0004\u0012\u00020\u000f0\u00122\u0006\u0010\u0018\u001a\u00020\u00192\u0006\u0010\u0015\u001a\u00020\u0016H\u0016J \u0010\u001a\u001a\b\u0012\u0004\u0012\u00020\u000f0\u00122\b\u0010\u001b\u001a\u0004\u0018\u00010\u001c2\u0006\u0010\u001d\u001a\u00020\u0016H\u0002J\b\u0010\u001e\u001a\u00020\u001fH\u0016J\b\u0010 \u001a\u00020\u001fH\u0016R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082.¢\u0006\u0002\n��Rf\u0010\b\u001aZ\u0012\f\u0012\n \u000b*\u0004\u0018\u00010\n0\n\u0012\u0018\u0012\u0016\u0012\u0004\u0012\u00020\r \u000b*\n\u0012\u0004\u0012\u00020\r\u0018\u00010\f0\f \u000b*,\u0012\f\u0012\n \u000b*\u0004\u0018\u00010\n0\n\u0012\u0018\u0012\u0016\u0012\u0004\u0012\u00020\r \u000b*\n\u0012\u0004\u0012\u00020\r\u0018\u00010\f0\f\u0018\u00010\t0\tX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000e\u001a\u00020\u000fX\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u0010\u001a\u00020\u000fX\u0082\u000e¢\u0006\u0002\n��¨\u0006\""}, d2 = {"Lorg/enodeframework/queue/DefaultSendReplyService;", "Lio/vertx/core/AbstractVerticle;", "Lorg/enodeframework/queue/ISendReplyService;", "serializeService", "Lorg/enodeframework/common/serializing/ISerializeService;", "(Lorg/enodeframework/common/serializing/ISerializeService;)V", "netClient", "Lio/vertx/core/net/NetClient;", "netSocketCache", "Lcom/google/common/cache/Cache;", "", "kotlin.jvm.PlatformType", "Lio/vertx/core/Promise;", "Lio/vertx/core/net/NetSocket;", "started", "", "stoped", "sendCommandReply", "Ljava/util/concurrent/CompletableFuture;", "commandResult", "Lorg/enodeframework/commanding/CommandResult;", "replyAddress", "Lorg/enodeframework/common/remoting/ReplySocketAddress;", "sendEventReply", "eventHandledMessage", "Lorg/enodeframework/queue/domainevent/DomainEventHandledMessage;", "sendReply", "replyMessage", "Lorg/enodeframework/common/remoting/ReplyMessage;", "replySocketAddress", "start", "", "stop", "Companion", "enode"})
/* loaded from: input_file:org/enodeframework/queue/DefaultSendReplyService.class */
public final class DefaultSendReplyService extends AbstractVerticle implements ISendReplyService {
    private boolean started;
    private boolean stoped;
    private NetClient netClient;
    private final Cache<String, Promise<NetSocket>> netSocketCache;
    private final ISerializeService serializeService;

    @NotNull
    public static final Companion Companion = new Companion(null);
    private static final Logger logger = LoggerFactory.getLogger(DefaultSendReplyService.class);

    /* compiled from: DefaultSendReplyService.kt */
    @Metadata(mv = {1, 4, 2}, bv = {1, 0, 3}, k = 1, d1 = {"��\u0014\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u0016\u0010\u0003\u001a\n \u0005*\u0004\u0018\u00010\u00040\u0004X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0006"}, d2 = {"Lorg/enodeframework/queue/DefaultSendReplyService$Companion;", "", "()V", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "enode"})
    /* loaded from: input_file:org/enodeframework/queue/DefaultSendReplyService$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    public void start() {
        if (this.started) {
            return;
        }
        NetClient createNetClient = this.vertx.createNetClient();
        Intrinsics.checkNotNullExpressionValue(createNetClient, "vertx.createNetClient()");
        this.netClient = createNetClient;
        this.started = true;
    }

    public void stop() {
        if (this.stoped) {
            return;
        }
        NetClient netClient = this.netClient;
        if (netClient == null) {
            Intrinsics.throwUninitializedPropertyAccessException("netClient");
        }
        netClient.close();
        this.stoped = true;
    }

    @Override // org.enodeframework.queue.ISendReplyService
    @NotNull
    public CompletableFuture<Boolean> sendCommandReply(@NotNull CommandResult commandResult, @NotNull ReplySocketAddress replySocketAddress) {
        Intrinsics.checkNotNullParameter(commandResult, "commandResult");
        Intrinsics.checkNotNullParameter(replySocketAddress, "replyAddress");
        ReplyMessage replyMessage = new ReplyMessage();
        replyMessage.setCode(CommandReturnType.CommandExecuted.getValue());
        replyMessage.setCommandResult(commandResult);
        return sendReply(replyMessage, replySocketAddress);
    }

    @Override // org.enodeframework.queue.ISendReplyService
    @NotNull
    public CompletableFuture<Boolean> sendEventReply(@NotNull DomainEventHandledMessage domainEventHandledMessage, @NotNull ReplySocketAddress replySocketAddress) {
        Intrinsics.checkNotNullParameter(domainEventHandledMessage, "eventHandledMessage");
        Intrinsics.checkNotNullParameter(replySocketAddress, "replyAddress");
        ReplyMessage replyMessage = new ReplyMessage();
        replyMessage.setCode(CommandReturnType.EventHandled.getValue());
        replyMessage.setEventHandledMessage(domainEventHandledMessage);
        return sendReply(replyMessage, replySocketAddress);
    }

    private final CompletableFuture<Boolean> sendReply(ReplyMessage replyMessage, ReplySocketAddress replySocketAddress) {
        final SocketAddress inetSocketAddress = SocketAddress.inetSocketAddress(replySocketAddress.getPort(), replySocketAddress.getHost());
        final String serialize = this.serializeService.serialize(replyMessage);
        final String uri = ReplyUtil.toUri(replySocketAddress);
        StringCompanionObject stringCompanionObject = StringCompanionObject.INSTANCE;
        Object[] objArr = {"client", uri};
        final String format = String.format("%s.%s", Arrays.copyOf(objArr, objArr.length));
        Intrinsics.checkNotNullExpressionValue(format, "java.lang.String.format(format, *args)");
        ((Promise) this.netSocketCache.get(uri, new Callable<Promise<NetSocket>>() { // from class: org.enodeframework.queue.DefaultSendReplyService$sendReply$promise$1
            @Override // java.util.concurrent.Callable
            public final Promise<NetSocket> call() {
                Handler promise = Promise.promise();
                DefaultSendReplyService.access$getNetClient$p(DefaultSendReplyService.this).connect(inetSocketAddress, promise);
                return promise;
            }
        })).future().onFailure(new Handler<Throwable>() { // from class: org.enodeframework.queue.DefaultSendReplyService$sendReply$1
            public final void handle(@Nullable Throwable th) {
                Cache cache;
                Logger logger2;
                cache = DefaultSendReplyService.this.netSocketCache;
                cache.invalidate(uri);
                logger2 = DefaultSendReplyService.logger;
                logger2.error("connect occurs unexpected error, msg: {}", serialize, th);
            }
        }).onSuccess(new Handler<NetSocket>() { // from class: org.enodeframework.queue.DefaultSendReplyService$sendReply$2
            public final void handle(@NotNull final NetSocket netSocket) {
                Intrinsics.checkNotNullParameter(netSocket, "socket");
                netSocket.exceptionHandler(new Handler<Throwable>() { // from class: org.enodeframework.queue.DefaultSendReplyService$sendReply$2.1
                    public final void handle(@Nullable Throwable th) {
                        Cache cache;
                        Logger logger2;
                        cache = DefaultSendReplyService.this.netSocketCache;
                        cache.invalidate(uri);
                        netSocket.close();
                        logger2 = DefaultSendReplyService.logger;
                        logger2.error("socket occurs unexpected error, msg: {}", serialize, th);
                    }
                });
                netSocket.closeHandler(new Handler<Void>() { // from class: org.enodeframework.queue.DefaultSendReplyService$sendReply$2.2
                    public final void handle(Void r6) {
                        Cache cache;
                        Logger logger2;
                        cache = DefaultSendReplyService.this.netSocketCache;
                        cache.invalidate(uri);
                        logger2 = DefaultSendReplyService.logger;
                        logger2.error("socket closed, indicatedServerName: {},writeHandlerID: {}", netSocket.indicatedServerName(), netSocket.writeHandlerID());
                    }
                });
                netSocket.handler(new FrameParser(new Handler<AsyncResult<JsonObject>>() { // from class: org.enodeframework.queue.DefaultSendReplyService$sendReply$2.3
                    public final void handle(@NotNull AsyncResult<JsonObject> asyncResult) {
                        Logger logger2;
                        Intrinsics.checkNotNullParameter(asyncResult, "parse");
                        if (asyncResult.succeeded()) {
                            logger2 = DefaultSendReplyService.logger;
                            logger2.info("receive server response: {}, {}", serialize, asyncResult);
                        }
                    }
                }));
                netSocket.endHandler(new Handler<Void>() { // from class: org.enodeframework.queue.DefaultSendReplyService$sendReply$2.4
                    public final void handle(Void r4) {
                        Cache cache;
                        cache = DefaultSendReplyService.this.netSocketCache;
                        cache.invalidate(uri);
                    }
                });
                FrameHelper.sendFrame("send", uri, format, new JsonObject(serialize), (WriteStream) netSocket);
            }
        });
        return Task.completedTask;
    }

    public DefaultSendReplyService(@NotNull ISerializeService iSerializeService) {
        Intrinsics.checkNotNullParameter(iSerializeService, "serializeService");
        this.serializeService = iSerializeService;
        this.netSocketCache = CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(10L)).maximumSize(10L).build();
    }

    public static final /* synthetic */ NetClient access$getNetClient$p(DefaultSendReplyService defaultSendReplyService) {
        NetClient netClient = defaultSendReplyService.netClient;
        if (netClient == null) {
            Intrinsics.throwUninitializedPropertyAccessException("netClient");
        }
        return netClient;
    }
}
