package org.enodeframework.vertx.message;

import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameHelper;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import org.enodeframework.common.exception.ReplyAddressInvalidException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* compiled from: PointToPointEventBus.java */
/* loaded from: input_file:org/enodeframework/vertx/message/ConnectionHolder.class */
public class ConnectionHolder {
    private final PointToPointEventBus eventBus;
    private final String remoteNodeAddress;
    private Queue<OutboundDeliveryContext> pending;
    private NetSocket socket;
    private boolean connected;
    private final Logger log = LoggerFactory.getLogger(ConnectionHolder.class);
    private long timeoutID = -1;
    private long pingTimeoutID = -1;

    public ConnectionHolder(PointToPointEventBus pointToPointEventBus, String str) {
        this.eventBus = pointToPointEventBus;
        this.remoteNodeAddress = str;
    }

    SocketAddress toURI(String str) {
        try {
            URI uri = new URI(str);
            return SocketAddress.inetSocketAddress(uri.getPort(), uri.getHost());
        } catch (Exception e) {
            this.log.error("parse address error. uri: {}", str, e);
            throw new ReplyAddressInvalidException(str, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void connect() {
        this.eventBus.client().connect(toURI(this.remoteNodeAddress)).onComplete(asyncResult -> {
            if (asyncResult.succeeded()) {
                connected((NetSocket) asyncResult.result());
            } else {
                this.log.warn("Connecting to server " + this.remoteNodeAddress + " failed", asyncResult.cause());
                close(asyncResult.cause());
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void writeMessage(OutboundDeliveryContext outboundDeliveryContext) {
        if (this.connected) {
            FrameHelper.sendFrame("send", this.remoteNodeAddress, outboundDeliveryContext.message, this.socket);
            return;
        }
        if (this.pending == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Not connected to server " + this.remoteNodeAddress + " - starting queuing");
            }
            this.pending = new ArrayDeque();
        }
        this.pending.add(outboundDeliveryContext);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        close(ConnectionBase.CLOSED_EXCEPTION);
    }

    private void close(Throwable th) {
        if (this.timeoutID != -1) {
            this.eventBus.vertx().cancelTimer(this.timeoutID);
        }
        if (this.pingTimeoutID != -1) {
            this.eventBus.vertx().cancelTimer(this.pingTimeoutID);
        }
        synchronized (this) {
            if (this.pending != null) {
                while (true) {
                    OutboundDeliveryContext poll = this.pending.poll();
                    if (poll == null) {
                        break;
                    }
                    poll.written(th);
                    this.log.error("connection closed, queue msg: {}", poll, th);
                }
            }
        }
        if (this.eventBus.connections().remove(this.remoteNodeAddress, this) && this.log.isDebugEnabled()) {
            this.log.debug("Point to point connection closed for server " + this.remoteNodeAddress);
        }
    }

    private void schedulePing() {
        EventBusOptions options = this.eventBus.options();
        this.pingTimeoutID = this.eventBus.vertx().setTimer(options.getClusterPingInterval(), l -> {
            this.timeoutID = this.eventBus.vertx().setTimer(options.getClusterPingReplyInterval(), l -> {
                this.log.warn("No pong from server " + this.remoteNodeAddress + " - will consider it dead");
                close();
            });
            FrameHelper.sendFrame("ping", this.remoteNodeAddress, new JsonObject(), this.socket);
        });
    }

    private synchronized void connected(NetSocket netSocket) {
        this.socket = netSocket;
        this.connected = true;
        netSocket.exceptionHandler(th -> {
            close(th);
        });
        netSocket.closeHandler(r3 -> {
            close();
        });
        netSocket.handler(buffer -> {
            this.eventBus.vertx().cancelTimer(this.timeoutID);
            schedulePing();
        });
        schedulePing();
        if (this.pending != null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Draining the queue for server " + this.remoteNodeAddress);
            }
            Iterator<OutboundDeliveryContext> it = this.pending.iterator();
            while (it.hasNext()) {
                FrameHelper.sendFrame("send", this.remoteNodeAddress, it.next().message, netSocket);
            }
        }
        this.pending = null;
    }
}
