package org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.clustered;

import java.util.ArrayDeque;
import java.util.Queue;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.AsyncResult;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Handler;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Promise;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.buffer.Buffer;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBusOptions;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.OutboundDeliveryContext;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.codecs.PingMessageCodec;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.VertxInternal;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.logging.Logger;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.logging.LoggerFactory;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.net.NetSocket;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.net.impl.ConnectionBase;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.spi.cluster.NodeInfo;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.spi.metrics.EventBusMetrics;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/pulsar/jetcd/shaded/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.class */
public class ConnectionHolder {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConnectionHolder.class);
    private static final String PING_ADDRESS = "__vertx_ping";
    private final ClusteredEventBus eventBus;
    private final String remoteNodeId;
    private final VertxInternal vertx;
    private final EventBusMetrics metrics;
    private Queue<OutboundDeliveryContext<?>> pending;
    private NetSocket socket;
    private boolean connected;
    private long timeoutID = -1;
    private long pingTimeoutID = -1;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConnectionHolder(ClusteredEventBus clusteredEventBus, String str) {
        this.eventBus = clusteredEventBus;
        this.remoteNodeId = str;
        this.vertx = clusteredEventBus.vertx();
        this.metrics = clusteredEventBus.getMetrics();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void connect() {
        Promise<NodeInfo> promise = Promise.promise();
        this.eventBus.vertx().getClusterManager().getNodeInfo(this.remoteNodeId, promise);
        promise.future().flatMap(nodeInfo -> {
            return this.eventBus.client().connect(nodeInfo.port(), nodeInfo.host());
        }).onComplete2(asyncResult -> {
            if (asyncResult.succeeded()) {
                connected((NetSocket) asyncResult.result());
            } else {
                log.warn("Connecting to server " + this.remoteNodeId + " failed", asyncResult.cause());
                close(asyncResult.cause());
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void writeMessage(OutboundDeliveryContext<?> outboundDeliveryContext) {
        if (this.connected) {
            Buffer encodeToWire = ((ClusteredMessage) outboundDeliveryContext.message).encodeToWire();
            if (this.metrics != null) {
                this.metrics.messageWritten(outboundDeliveryContext.message.address(), encodeToWire.length());
            }
            this.socket.write(encodeToWire, (Handler<AsyncResult<Void>>) outboundDeliveryContext);
            return;
        }
        if (this.pending == null) {
            if (log.isDebugEnabled()) {
                log.debug("Not connected to server " + this.remoteNodeId + " - starting queuing");
            }
            this.pending = new ArrayDeque();
        }
        this.pending.add(outboundDeliveryContext);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        close(ConnectionBase.CLOSED_EXCEPTION);
    }

    private void close(Throwable th) {
        if (this.timeoutID != -1) {
            this.vertx.cancelTimer(this.timeoutID);
        }
        if (this.pingTimeoutID != -1) {
            this.vertx.cancelTimer(this.pingTimeoutID);
        }
        synchronized (this) {
            if (this.pending != null) {
                while (true) {
                    OutboundDeliveryContext<?> poll = this.pending.poll();
                    if (poll == null) {
                        break;
                    } else {
                        poll.written(th);
                    }
                }
            }
        }
        if (this.eventBus.connections().remove(this.remoteNodeId, this) && log.isDebugEnabled()) {
            log.debug("Cluster connection closed for server " + this.remoteNodeId);
        }
    }

    private void schedulePing() {
        EventBusOptions options = this.eventBus.options();
        this.pingTimeoutID = this.vertx.setTimer(options.getClusterPingInterval(), l -> {
            this.timeoutID = this.vertx.setTimer(options.getClusterPingReplyInterval(), l -> {
                log.warn("No pong from server " + this.remoteNodeId + " - will consider it dead");
                close();
            });
            this.socket.write((NetSocket) new ClusteredMessage(this.remoteNodeId, PING_ADDRESS, null, null, new PingMessageCodec(), true, this.eventBus).encodeToWire());
        });
    }

    private synchronized void connected(NetSocket netSocket) {
        this.socket = netSocket;
        this.connected = true;
        netSocket.exceptionHandler(th -> {
            close(th);
        });
        netSocket.mo5183closeHandler(r3 -> {
            close();
        });
        netSocket.handler2(buffer -> {
            this.vertx.cancelTimer(this.timeoutID);
            schedulePing();
        });
        schedulePing();
        if (this.pending != null) {
            if (log.isDebugEnabled()) {
                log.debug("Draining the queue for server " + this.remoteNodeId);
            }
            for (OutboundDeliveryContext<?> outboundDeliveryContext : this.pending) {
                Buffer encodeToWire = ((ClusteredMessage) outboundDeliveryContext.message).encodeToWire();
                if (this.metrics != null) {
                    this.metrics.messageWritten(outboundDeliveryContext.message.address(), encodeToWire.length());
                }
                netSocket.write(encodeToWire, (Handler<AsyncResult<Void>>) outboundDeliveryContext);
            }
        }
        this.pending = null;
    }
}
