package org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl;

import io.kubernetes.client.openapi.models.V1NodeAddress;
import io.kubernetes.client.openapi.models.V1RuntimeClass;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.AsyncResult;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Future;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Handler;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.MultiMap;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Promise;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.DeliveryContext;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.DeliveryOptions;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.Message;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageCodec;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageProducer;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.ReplyException;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.ReplyFailure;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.ContextInternal;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.VertxInternal;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.utils.ConcurrentCyclicSequence;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.spi.metrics.EventBusMetrics;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.spi.metrics.MetricsProvider;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.spi.metrics.VertxMetrics;

/* loaded from: input_file:META-INF/bundled-dependencies/jetcd-core-shaded-3.3.2.4.jar:org/apache/pulsar/jetcd/shaded/io/vertx/core/eventbus/impl/EventBusImpl.class */
public class EventBusImpl implements EventBusInternal, MetricsProvider {
    private static final AtomicReferenceFieldUpdater<EventBusImpl, Handler[]> OUTBOUND_INTERCEPTORS_UPDATER = AtomicReferenceFieldUpdater.newUpdater(EventBusImpl.class, Handler[].class, "outboundInterceptors");
    private static final AtomicReferenceFieldUpdater<EventBusImpl, Handler[]> INBOUND_INTERCEPTORS_UPDATER = AtomicReferenceFieldUpdater.newUpdater(EventBusImpl.class, Handler[].class, "inboundInterceptors");
    protected final VertxInternal vertx;
    protected final EventBusMetrics metrics;
    protected volatile boolean started;
    private volatile Handler<DeliveryContext>[] outboundInterceptors = new Handler[0];
    private volatile Handler<DeliveryContext>[] inboundInterceptors = new Handler[0];
    private final AtomicLong replySequence = new AtomicLong(0);
    protected final ConcurrentMap<String, ConcurrentCyclicSequence<HandlerHolder>> handlerMap = new ConcurrentHashMap();
    protected final CodecManager codecManager = new CodecManager();

    public EventBusImpl(VertxInternal vertxInternal) {
        VertxMetrics metricsSPI = vertxInternal.metricsSPI();
        this.vertx = vertxInternal;
        this.metrics = metricsSPI != null ? metricsSPI.createEventBusMetrics() : null;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public <T> EventBus addOutboundInterceptor(Handler<DeliveryContext<T>> handler) {
        addInterceptor(OUTBOUND_INTERCEPTORS_UPDATER, (Handler) Objects.requireNonNull(handler));
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public <T> EventBus addInboundInterceptor(Handler<DeliveryContext<T>> handler) {
        addInterceptor(INBOUND_INTERCEPTORS_UPDATER, (Handler) Objects.requireNonNull(handler));
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public <T> EventBus removeOutboundInterceptor(Handler<DeliveryContext<T>> handler) {
        removeInterceptor(OUTBOUND_INTERCEPTORS_UPDATER, (Handler) Objects.requireNonNull(handler));
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public <T> EventBus removeInboundInterceptor(Handler<DeliveryContext<T>> handler) {
        removeInterceptor(INBOUND_INTERCEPTORS_UPDATER, (Handler) Objects.requireNonNull(handler));
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Handler<DeliveryContext>[] inboundInterceptors() {
        return this.inboundInterceptors;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Handler<DeliveryContext>[] outboundInterceptors() {
        return this.outboundInterceptors;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public EventBus clusterSerializableChecker(Function<String, Boolean> function) {
        this.codecManager.clusterSerializableCheck(function);
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public EventBus serializableChecker(Function<String, Boolean> function) {
        this.codecManager.serializableCheck(function);
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.EventBusInternal
    public synchronized void start(Promise<Void> promise) {
        if (this.started) {
            throw new IllegalStateException("Already started");
        }
        this.started = true;
        promise.complete();
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public EventBus send(String str, Object obj) {
        return send(str, obj, new DeliveryOptions());
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public EventBus send(String str, Object obj, DeliveryOptions deliveryOptions) {
        sendOrPubInternal(createMessage(true, isLocalOnly(deliveryOptions), str, deliveryOptions.getHeaders(), obj, deliveryOptions.getCodecName()), deliveryOptions, null, null);
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public <T> Future<Message<T>> request(String str, Object obj, DeliveryOptions deliveryOptions) {
        MessageImpl createMessage = createMessage(true, isLocalOnly(deliveryOptions), str, deliveryOptions.getHeaders(), obj, deliveryOptions.getCodecName());
        ReplyHandler<T> createReplyHandler = createReplyHandler(createMessage, true, deliveryOptions);
        sendOrPubInternal(createMessage, deliveryOptions, createReplyHandler, null);
        return createReplyHandler.result();
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> sender(String str) {
        Objects.requireNonNull(str, V1NodeAddress.SERIALIZED_NAME_ADDRESS);
        return new MessageProducerImpl(this.vertx, str, true, new DeliveryOptions());
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> sender(String str, DeliveryOptions deliveryOptions) {
        Objects.requireNonNull(str, V1NodeAddress.SERIALIZED_NAME_ADDRESS);
        Objects.requireNonNull(deliveryOptions, "options");
        return new MessageProducerImpl(this.vertx, str, true, deliveryOptions);
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> publisher(String str) {
        Objects.requireNonNull(str, V1NodeAddress.SERIALIZED_NAME_ADDRESS);
        return new MessageProducerImpl(this.vertx, str, false, new DeliveryOptions());
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> publisher(String str, DeliveryOptions deliveryOptions) {
        Objects.requireNonNull(str, V1NodeAddress.SERIALIZED_NAME_ADDRESS);
        Objects.requireNonNull(deliveryOptions, "options");
        return new MessageProducerImpl(this.vertx, str, false, deliveryOptions);
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public EventBus publish(String str, Object obj) {
        return publish(str, obj, new DeliveryOptions());
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public EventBus publish(String str, Object obj, DeliveryOptions deliveryOptions) {
        sendOrPubInternal(createMessage(false, isLocalOnly(deliveryOptions), str, deliveryOptions.getHeaders(), obj, deliveryOptions.getCodecName()), deliveryOptions, null, null);
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> consumer(String str) {
        checkStarted();
        Objects.requireNonNull(str, V1NodeAddress.SERIALIZED_NAME_ADDRESS);
        return new MessageConsumerImpl(this.vertx, this.vertx.getOrCreateContext(), this, str, false);
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> consumer(String str, Handler<Message<T>> handler) {
        Objects.requireNonNull(handler, V1RuntimeClass.SERIALIZED_NAME_HANDLER);
        MessageConsumer<T> consumer = consumer(str);
        consumer.handler2((Handler) handler);
        return consumer;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> localConsumer(String str) {
        checkStarted();
        Objects.requireNonNull(str, V1NodeAddress.SERIALIZED_NAME_ADDRESS);
        return new MessageConsumerImpl(this.vertx, this.vertx.getOrCreateContext(), this, str, true);
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> localConsumer(String str, Handler<Message<T>> handler) {
        Objects.requireNonNull(handler, V1RuntimeClass.SERIALIZED_NAME_HANDLER);
        MessageConsumer<T> localConsumer = localConsumer(str);
        localConsumer.handler2((Handler) handler);
        return localConsumer;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public EventBus registerCodec(MessageCodec messageCodec) {
        this.codecManager.registerCodec(messageCodec);
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public EventBus unregisterCodec(String str) {
        this.codecManager.unregisterCodec(str);
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public <T> EventBus registerDefaultCodec(Class<T> cls, MessageCodec<T, ?> messageCodec) {
        this.codecManager.registerDefaultCodec(cls, messageCodec);
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public EventBus unregisterDefaultCodec(Class cls) {
        this.codecManager.unregisterDefaultCodec(cls);
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.EventBus
    public EventBus codecSelector(Function<Object, String> function) {
        this.codecManager.codecSelector(function);
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.EventBusInternal
    public void close(Promise<Void> promise) {
        if (this.started) {
            unregisterAll().onComplete2(asyncResult -> {
                if (this.metrics != null) {
                    this.metrics.close();
                }
                promise.handle(asyncResult);
            });
        } else {
            promise.complete();
        }
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.metrics.Measured
    public boolean isMetricsEnabled() {
        return this.metrics != null;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.spi.metrics.MetricsProvider
    public EventBusMetrics<?> getMetrics() {
        return this.metrics;
    }

    public MessageImpl createMessage(boolean z, boolean z2, String str, MultiMap multiMap, Object obj, String str2) {
        Objects.requireNonNull(str, "no null address accepted");
        return new MessageImpl(str, multiMap, obj, this.codecManager.lookupCodec(obj, str2, z2), z, this);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> HandlerHolder<T> addRegistration(String str, HandlerRegistration<T> handlerRegistration, boolean z, boolean z2, Promise<Void> promise) {
        HandlerHolder<T> addLocalRegistration = addLocalRegistration(str, handlerRegistration, z, z2);
        onLocalRegistration(addLocalRegistration, promise);
        return addLocalRegistration;
    }

    protected <T> void onLocalRegistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
        if (promise != null) {
            promise.complete();
        }
    }

    private <T> HandlerHolder<T> addLocalRegistration(String str, HandlerRegistration<T> handlerRegistration, boolean z, boolean z2) {
        Objects.requireNonNull(str, V1NodeAddress.SERIALIZED_NAME_ADDRESS);
        ContextInternal contextInternal = handlerRegistration.context;
        HandlerHolder<T> createHandlerHolder = createHandlerHolder(handlerRegistration, z, z2, contextInternal);
        this.handlerMap.merge(str, new ConcurrentCyclicSequence().add(createHandlerHolder), (concurrentCyclicSequence, concurrentCyclicSequence2) -> {
            return concurrentCyclicSequence.add(concurrentCyclicSequence2.first());
        });
        if (contextInternal.isDeployment()) {
            contextInternal.addCloseHook(handlerRegistration);
        }
        return createHandlerHolder;
    }

    protected <T> HandlerHolder<T> createHandlerHolder(HandlerRegistration<T> handlerRegistration, boolean z, boolean z2, ContextInternal contextInternal) {
        return new HandlerHolder<>(handlerRegistration, z, z2, contextInternal);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> void removeRegistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
        removeLocalRegistration(handlerHolder);
        onLocalUnregistration(handlerHolder, promise);
    }

    protected <T> void onLocalUnregistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
        promise.complete();
    }

    private <T> void removeLocalRegistration(HandlerHolder<T> handlerHolder) {
        this.handlerMap.compute(handlerHolder.getHandler().address, (str, concurrentCyclicSequence) -> {
            if (concurrentCyclicSequence == null) {
                return null;
            }
            ConcurrentCyclicSequence remove = concurrentCyclicSequence.remove(handlerHolder);
            if (remove.size() == 0) {
                return null;
            }
            return remove;
        });
        if (!handlerHolder.setRemoved() || handlerHolder.getContext().deploymentID() == null) {
            return;
        }
        handlerHolder.getContext().removeCloseHook(handlerHolder.getHandler());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> void sendReply(MessageImpl messageImpl, DeliveryOptions deliveryOptions, ReplyHandler<T> replyHandler) {
        if (messageImpl.address() == null) {
            throw new IllegalStateException("address not specified");
        }
        sendOrPubInternal(new OutboundDeliveryContext<>(this.vertx.getOrCreateContext(), messageImpl, deliveryOptions, replyHandler, null));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> void sendOrPub(OutboundDeliveryContext<T> outboundDeliveryContext) {
        sendLocally(outboundDeliveryContext);
    }

    protected void callCompletionHandlerAsync(Handler<AsyncResult<Void>> handler) {
        if (handler != null) {
            this.vertx.runOnContext(r4 -> {
                handler.handle(Future.succeededFuture());
            });
        }
    }

    private <T> void sendLocally(OutboundDeliveryContext<T> outboundDeliveryContext) {
        ReplyException deliverMessageLocally = deliverMessageLocally(outboundDeliveryContext.message);
        if (deliverMessageLocally != null) {
            outboundDeliveryContext.written(deliverMessageLocally);
        } else {
            outboundDeliveryContext.written(null);
        }
    }

    protected boolean isMessageLocal(MessageImpl messageImpl) {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ReplyException deliverMessageLocally(MessageImpl messageImpl) {
        ConcurrentCyclicSequence<HandlerHolder> concurrentCyclicSequence = this.handlerMap.get(messageImpl.address());
        boolean isMessageLocal = isMessageLocal(messageImpl);
        if (concurrentCyclicSequence == null) {
            if (this.metrics != null) {
                this.metrics.messageReceived(messageImpl.address(), !messageImpl.isSend(), isMessageLocal, 0);
            }
            return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + messageImpl.address);
        }
        if (messageImpl.isSend()) {
            HandlerHolder nextHandler = nextHandler(concurrentCyclicSequence, isMessageLocal);
            if (this.metrics != null) {
                this.metrics.messageReceived(messageImpl.address(), !messageImpl.isSend(), isMessageLocal, nextHandler != null ? 1 : 0);
            }
            if (nextHandler == null) {
                return null;
            }
            nextHandler.handler.receive(messageImpl.copyBeforeReceive());
            return null;
        }
        if (this.metrics != null) {
            this.metrics.messageReceived(messageImpl.address(), !messageImpl.isSend(), isMessageLocal, concurrentCyclicSequence.size());
        }
        Iterator<HandlerHolder> it = concurrentCyclicSequence.iterator();
        while (it.hasNext()) {
            HandlerHolder next = it.next();
            if (isMessageLocal || !next.isLocalOnly()) {
                next.handler.receive(messageImpl.copyBeforeReceive());
            }
        }
        return null;
    }

    protected HandlerHolder nextHandler(ConcurrentCyclicSequence<HandlerHolder> concurrentCyclicSequence, boolean z) {
        return concurrentCyclicSequence.next();
    }

    protected void checkStarted() {
        if (!this.started) {
            throw new IllegalStateException("Event Bus is not started");
        }
    }

    protected String generateReplyAddress() {
        return "__vertx.reply." + Long.toString(this.replySequence.incrementAndGet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> ReplyHandler<T> createReplyHandler(MessageImpl messageImpl, boolean z, DeliveryOptions deliveryOptions) {
        long sendTimeout = deliveryOptions.getSendTimeout();
        String generateReplyAddress = generateReplyAddress();
        messageImpl.setReplyAddress(generateReplyAddress);
        ReplyHandler<T> replyHandler = new ReplyHandler<>(this, this.vertx.getOrCreateContext(), generateReplyAddress, messageImpl.address, z, sendTimeout);
        replyHandler.register();
        return replyHandler;
    }

    public <T> OutboundDeliveryContext<T> newSendContext(MessageImpl messageImpl, DeliveryOptions deliveryOptions, ReplyHandler<T> replyHandler, Promise<Void> promise) {
        return new OutboundDeliveryContext<>(this.vertx.getOrCreateContext(), messageImpl, deliveryOptions, replyHandler, promise);
    }

    public <T> void sendOrPubInternal(OutboundDeliveryContext<T> outboundDeliveryContext) {
        checkStarted();
        outboundDeliveryContext.bus = this;
        outboundDeliveryContext.metrics = this.metrics;
        outboundDeliveryContext.next();
    }

    public <T> void sendOrPubInternal(MessageImpl messageImpl, DeliveryOptions deliveryOptions, ReplyHandler<T> replyHandler, Promise<Void> promise) {
        checkStarted();
        if (promise == null) {
            sendOrPubInternal(newSendContext(messageImpl, deliveryOptions, replyHandler, null));
            return;
        }
        Promise<Void> promise2 = Promise.promise();
        sendOrPubInternal(newSendContext(messageImpl, deliveryOptions, replyHandler, promise2));
        Future<Void> future = promise2.future();
        if (!messageImpl.send) {
            future = future.recover(th -> {
                return th instanceof ReplyException ? Future.failedFuture(th) : Future.succeededFuture();
            });
        }
        future.onComplete2(promise);
    }

    private Future<Void> unregisterAll() {
        ArrayList arrayList = new ArrayList();
        Iterator<ConcurrentCyclicSequence<HandlerHolder>> it = this.handlerMap.values().iterator();
        while (it.hasNext()) {
            Iterator<HandlerHolder> it2 = it.next().iterator();
            while (it2.hasNext()) {
                arrayList.add(it2.next().getHandler().unregister());
            }
        }
        return Future.join(arrayList).mapEmpty();
    }

    private void addInterceptor(AtomicReferenceFieldUpdater<EventBusImpl, Handler[]> atomicReferenceFieldUpdater, Handler handler) {
        Handler[] handlerArr;
        Handler[] handlerArr2;
        do {
            handlerArr = atomicReferenceFieldUpdater.get(this);
            handlerArr2 = (Handler[]) Arrays.copyOf(handlerArr, handlerArr.length + 1);
            handlerArr2[handlerArr.length] = handler;
        } while (!atomicReferenceFieldUpdater.compareAndSet(this, handlerArr, handlerArr2));
    }

    private void removeInterceptor(AtomicReferenceFieldUpdater<EventBusImpl, Handler[]> atomicReferenceFieldUpdater, Handler handler) {
        Handler[] handlerArr;
        Handler[] handlerArr2;
        do {
            handlerArr = atomicReferenceFieldUpdater.get(this);
            int i = -1;
            int i2 = 0;
            while (true) {
                if (i2 >= handlerArr.length) {
                    break;
                }
                if (handlerArr[i2].equals(handler)) {
                    i = i2;
                    break;
                }
                i2++;
            }
            if (i == -1) {
                return;
            }
            handlerArr2 = new Handler[handlerArr.length - 1];
            System.arraycopy(handlerArr, 0, handlerArr2, 0, i);
            System.arraycopy(handlerArr, i + 1, handlerArr2, i, handlerArr2.length - i);
        } while (!atomicReferenceFieldUpdater.compareAndSet(this, handlerArr, handlerArr2));
    }

    private boolean isLocalOnly(DeliveryOptions deliveryOptions) {
        if (this.vertx.isClustered()) {
            return deliveryOptions.isLocalOnly();
        }
        return true;
    }
}
