package org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.clustered;

import java.util.Iterator;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Handler;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.MultiMap;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Promise;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.VertxOptions;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.buffer.Buffer;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.AddressHelper;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBusOptions;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.CodecManager;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.EventBusImpl;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.HandlerHolder;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.HandlerRegistration;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.MessageImpl;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.OutboundDeliveryContext;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.CloseFuture;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.ContextInternal;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.VertxInternal;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.future.PromiseInternal;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.logging.Logger;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.logging.LoggerFactory;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.utils.ConcurrentCyclicSequence;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.net.NetClient;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.net.NetClientOptions;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.net.NetServer;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.net.NetServerOptions;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.net.NetSocket;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.net.impl.NetClientBuilder;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.parsetools.RecordParser;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.spi.cluster.ClusterManager;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.spi.cluster.NodeInfo;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.spi.cluster.NodeSelector;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.spi.cluster.RegistrationInfo;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.spi.metrics.VertxMetrics;

/* loaded from: input_file:META-INF/bundled-dependencies/jetcd-core-shaded-3.3.3.2.jar:org/apache/pulsar/jetcd/shaded/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.class */
public class ClusteredEventBus extends EventBusImpl {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ClusteredEventBus.class);
    private static final Buffer PONG = Buffer.buffer(new byte[]{1});
    private final EventBusOptions options;
    private final ClusterManager clusterManager;
    private final NodeSelector nodeSelector;
    private final AtomicLong handlerSequence;
    private final NetClient client;
    private final ConcurrentMap<String, ConnectionHolder> connections;
    private final CloseFuture closeFuture;
    private final ContextInternal ebContext;
    private NodeInfo nodeInfo;
    private String nodeId;
    private NetServer server;

    public ClusteredEventBus(VertxInternal vertxInternal, VertxOptions vertxOptions, ClusterManager clusterManager, NodeSelector nodeSelector) {
        super(vertxInternal);
        this.handlerSequence = new AtomicLong(0L);
        this.connections = new ConcurrentHashMap();
        this.options = vertxOptions.getEventBusOptions();
        this.clusterManager = clusterManager;
        this.nodeSelector = nodeSelector;
        this.closeFuture = new CloseFuture(log);
        this.ebContext = vertxInternal.createEventLoopContext(null, this.closeFuture, null, Thread.currentThread().getContextClassLoader());
        this.client = createNetClient(vertxInternal, new NetClientOptions(this.options.toJson()).setHostnameVerificationAlgorithm(""), this.closeFuture);
    }

    private NetClient createNetClient(VertxInternal vertxInternal, NetClientOptions netClientOptions, CloseFuture closeFuture) {
        NetClientBuilder netClientBuilder = new NetClientBuilder(vertxInternal, netClientOptions);
        VertxMetrics metricsSPI = vertxInternal.metricsSPI();
        if (metricsSPI != null) {
            netClientBuilder.metrics(metricsSPI.createNetClientMetrics(netClientOptions));
        }
        netClientBuilder.closeFuture(closeFuture);
        return netClientBuilder.build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NetClient client() {
        return this.client;
    }

    private NetServerOptions getServerOptions() {
        return new NetServerOptions(this.options.toJson());
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.EventBusImpl, org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.EventBusInternal
    public void start(Promise<Void> promise) {
        this.server = this.vertx.createNetServer(getServerOptions());
        this.server.connectHandler(getServerHandler());
        int clusterPort = getClusterPort();
        String clusterHost = getClusterHost();
        this.ebContext.runOnContext(r8 -> {
            this.server.listen(clusterPort, clusterHost).flatMap(netServer -> {
                this.nodeInfo = new NodeInfo(getClusterPublicHost(clusterHost), getClusterPublicPort(this.server.actualPort()), this.options.getClusterNodeMetadata());
                this.nodeId = this.clusterManager.getNodeId();
                Promise<Void> promise2 = Promise.promise();
                this.clusterManager.setNodeInfo(this.nodeInfo, promise2);
                return promise2.future();
            }).andThen(asyncResult -> {
                if (asyncResult.succeeded()) {
                    this.started = true;
                    this.nodeSelector.eventBusStarted();
                }
            }).onComplete2(promise);
        });
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.EventBusImpl, org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.EventBusInternal
    public void close(Promise<Void> promise) {
        Promise<Void> promise2 = Promise.promise();
        super.close(promise2);
        promise2.future().transform(asyncResult -> {
            return this.closeFuture.close();
        }).andThen(asyncResult2 -> {
            if (this.server != null) {
                Iterator<ConnectionHolder> it = this.connections.values().iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
            }
        }).onComplete2(promise);
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.EventBusImpl
    public MessageImpl createMessage(boolean z, boolean z2, String str, MultiMap multiMap, Object obj, String str2) {
        Objects.requireNonNull(str, "no null address accepted");
        return new ClusteredMessage(this.nodeId, str, multiMap, obj, this.codecManager.lookupCodec(obj, str2, z2), z, this);
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.EventBusImpl
    protected <T> void onLocalRegistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
        if (!handlerHolder.isReplyHandler()) {
            this.clusterManager.addRegistration(handlerHolder.getHandler().address, new RegistrationInfo(this.nodeId, handlerHolder.getSeq(), handlerHolder.isLocalOnly()), (Promise) Objects.requireNonNull(promise));
        } else if (promise != null) {
            promise.complete();
        }
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.EventBusImpl
    protected <T> HandlerHolder<T> createHandlerHolder(HandlerRegistration<T> handlerRegistration, boolean z, boolean z2, ContextInternal contextInternal) {
        return new ClusteredHandlerHolder(handlerRegistration, z, z2, contextInternal, this.handlerSequence.getAndIncrement());
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.EventBusImpl
    protected <T> void onLocalUnregistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
        if (handlerHolder.isReplyHandler()) {
            promise.complete();
            return;
        }
        RegistrationInfo registrationInfo = new RegistrationInfo(this.nodeId, handlerHolder.getSeq(), handlerHolder.isLocalOnly());
        Promise<Void> promise2 = Promise.promise();
        this.clusterManager.removeRegistration(handlerHolder.getHandler().address, registrationInfo, promise2);
        promise2.future().onComplete2(promise);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.EventBusImpl
    public <T> void sendOrPub(OutboundDeliveryContext<T> outboundDeliveryContext) {
        if (((ClusteredMessage) outboundDeliveryContext.message).getRepliedTo() != null) {
            clusteredSendReply(((ClusteredMessage) outboundDeliveryContext.message).getRepliedTo(), outboundDeliveryContext);
            return;
        }
        if (outboundDeliveryContext.options.isLocalOnly()) {
            super.sendOrPub(outboundDeliveryContext);
            return;
        }
        Serializer serializer = Serializer.get(outboundDeliveryContext.ctx);
        if (outboundDeliveryContext.message.isSend()) {
            PromiseInternal<T> promise = outboundDeliveryContext.ctx.promise();
            MessageImpl<?, T> messageImpl = outboundDeliveryContext.message;
            NodeSelector nodeSelector = this.nodeSelector;
            nodeSelector.getClass();
            serializer.queue(messageImpl, nodeSelector::selectForSend, promise);
            promise.future().onComplete2(asyncResult -> {
                if (asyncResult.succeeded()) {
                    sendToNode(outboundDeliveryContext, (String) asyncResult.result());
                } else {
                    sendOrPublishFailed(outboundDeliveryContext, asyncResult.cause());
                }
            });
            return;
        }
        PromiseInternal<T> promise2 = outboundDeliveryContext.ctx.promise();
        MessageImpl<?, T> messageImpl2 = outboundDeliveryContext.message;
        NodeSelector nodeSelector2 = this.nodeSelector;
        nodeSelector2.getClass();
        serializer.queue(messageImpl2, nodeSelector2::selectForPublish, promise2);
        promise2.future().onComplete2(asyncResult2 -> {
            if (asyncResult2.succeeded()) {
                sendToNodes(outboundDeliveryContext, (Iterable) asyncResult2.result());
            } else {
                sendOrPublishFailed(outboundDeliveryContext, asyncResult2.cause());
            }
        });
    }

    private void sendOrPublishFailed(OutboundDeliveryContext<?> outboundDeliveryContext, Throwable th) {
        if (log.isDebugEnabled()) {
            log.error("Failed to send message", th);
        }
        outboundDeliveryContext.written(th);
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.EventBusImpl
    protected String generateReplyAddress() {
        return "__vertx.reply." + UUID.randomUUID().toString();
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.EventBusImpl
    protected boolean isMessageLocal(MessageImpl messageImpl) {
        return !((ClusteredMessage) messageImpl).isFromWire();
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.EventBusImpl
    protected HandlerHolder nextHandler(ConcurrentCyclicSequence<HandlerHolder> concurrentCyclicSequence, boolean z) {
        HandlerHolder handlerHolder = null;
        if (z) {
            handlerHolder = concurrentCyclicSequence.next();
        } else {
            Iterator<HandlerHolder> it = concurrentCyclicSequence.iterator(false);
            while (it.hasNext()) {
                HandlerHolder next = it.next();
                if (next.isReplyHandler() || !next.isLocalOnly()) {
                    handlerHolder = next;
                    break;
                }
            }
        }
        return handlerHolder;
    }

    private int getClusterPort() {
        return this.options.getPort();
    }

    private String getClusterHost() {
        String host = this.options.getHost();
        if (host != null) {
            return host;
        }
        String clusterHost = this.clusterManager.clusterHost();
        return clusterHost != null ? clusterHost : AddressHelper.defaultAddress();
    }

    private int getClusterPublicPort(int i) {
        int clusterPublicPort = this.options.getClusterPublicPort();
        return clusterPublicPort > 0 ? clusterPublicPort : i;
    }

    private String getClusterPublicHost(String str) {
        String clusterPublicHost = this.options.getClusterPublicHost();
        if (clusterPublicHost != null) {
            return clusterPublicHost;
        }
        String host = this.options.getHost();
        if (host != null) {
            return host;
        }
        String clusterPublicHost2 = this.clusterManager.clusterPublicHost();
        return clusterPublicHost2 != null ? clusterPublicHost2 : str;
    }

    private Handler<NetSocket> getServerHandler() {
        return netSocket -> {
            final RecordParser newFixed = RecordParser.newFixed(4);
            newFixed.setOutput(new Handler<Buffer>() { // from class: org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.1
                int size = -1;

                @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.Handler
                public void handle(Buffer buffer) {
                    if (this.size == -1) {
                        this.size = buffer.getInt(0);
                        newFixed.fixedSizeMode(this.size);
                        return;
                    }
                    ClusteredMessage clusteredMessage = new ClusteredMessage(ClusteredEventBus.this);
                    clusteredMessage.readFromWire(buffer, ClusteredEventBus.this.codecManager);
                    if (ClusteredEventBus.this.metrics != null) {
                        ClusteredEventBus.this.metrics.messageRead(clusteredMessage.address(), buffer.length());
                    }
                    newFixed.fixedSizeMode(4);
                    this.size = -1;
                    if (clusteredMessage.hasFailure()) {
                        clusteredMessage.internalError();
                    } else if (clusteredMessage.codec() == CodecManager.PING_MESSAGE_CODEC) {
                        netSocket.write((NetSocket) ClusteredEventBus.PONG);
                    } else {
                        ClusteredEventBus.this.deliverMessageLocally(clusteredMessage);
                    }
                }
            });
            netSocket.handler2((Handler<Buffer>) newFixed);
        };
    }

    private <T> void sendToNode(OutboundDeliveryContext<T> outboundDeliveryContext, String str) {
        if (str == null || str.equals(this.nodeId)) {
            super.sendOrPub(outboundDeliveryContext);
        } else {
            sendRemote(outboundDeliveryContext, str, outboundDeliveryContext.message);
        }
    }

    private <T> void sendToNodes(OutboundDeliveryContext<T> outboundDeliveryContext, Iterable<String> iterable) {
        boolean z = false;
        if (iterable != null) {
            for (String str : iterable) {
                if (!z) {
                    z = true;
                }
                sendToNode(outboundDeliveryContext, str);
            }
        }
        if (z) {
            return;
        }
        super.sendOrPub(outboundDeliveryContext);
    }

    private <T> void clusteredSendReply(String str, OutboundDeliveryContext<T> outboundDeliveryContext) {
        MessageImpl<?, T> messageImpl = outboundDeliveryContext.message;
        if (str.equals(this.nodeId)) {
            super.sendOrPub(outboundDeliveryContext);
        } else {
            sendRemote(outboundDeliveryContext, str, messageImpl);
        }
    }

    private void sendRemote(OutboundDeliveryContext<?> outboundDeliveryContext, String str, MessageImpl messageImpl) {
        ConnectionHolder connectionHolder = this.connections.get(str);
        if (connectionHolder == null) {
            connectionHolder = new ConnectionHolder(this, str);
            ConnectionHolder putIfAbsent = this.connections.putIfAbsent(str, connectionHolder);
            if (putIfAbsent != null) {
                connectionHolder = putIfAbsent;
            } else {
                connectionHolder.connect();
            }
        }
        connectionHolder.writeMessage(outboundDeliveryContext);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConcurrentMap<String, ConnectionHolder> connections() {
        return this.connections;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public VertxInternal vertx() {
        return this.vertx;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventBusOptions options() {
        return this.options;
    }
}
