package org.apache.pulsar.reactive.client.internal.api;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.jctools.queues.MpmcArrayQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;

/* loaded from: input_file:org/apache/pulsar/reactive/client/internal/api/InflightLimiter.class */
public class InflightLimiter implements PublisherTransformer {
    public static final int DEFAULT_MAX_PENDING_SUBSCRIPTIONS = 1024;
    private final MpmcArrayQueue<InflightLimiterSubscriber<?>> pendingSubscriptions;
    private final AtomicInteger inflight;
    private final AtomicInteger activeSubscriptions;
    private final int maxInflight;
    private final int expectedSubscriptionsInflight;
    private final Scheduler.Worker triggerNextWorker;
    private final AtomicBoolean triggerNextTriggered;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/reactive/client/internal/api/InflightLimiter$InflightLimiterSubscriber.class */
    public class InflightLimiterSubscriber<I> extends BaseSubscriber<I> {
        private final CoreSubscriber<? super I> actual;
        private final Publisher<I> source;
        private final AtomicLong requestedDemand = new AtomicLong();
        private final AtomicReference<InflightLimiterSubscriberState> state = new AtomicReference<>(InflightLimiterSubscriberState.INITIAL);
        private final AtomicInteger inflightForSubscription = new AtomicInteger();
        private final Subscription subscription = new Subscription() { // from class: org.apache.pulsar.reactive.client.internal.api.InflightLimiter.InflightLimiterSubscriber.1
            public void request(long j) {
                if (j == Long.MAX_VALUE) {
                    InflightLimiterSubscriber.this.requestedDemand.set(j);
                } else if (InflightLimiterSubscriber.this.requestedDemand.get() != Long.MAX_VALUE) {
                    InflightLimiterSubscriber.this.requestedDemand.addAndGet(j);
                }
                InflightLimiterSubscriber.this.maybeAddToPending();
                InflightLimiter.this.maybeTriggerNext();
            }

            public void cancel() {
                InflightLimiterSubscriber.this.cancel();
            }
        };

        InflightLimiterSubscriber(CoreSubscriber<? super I> coreSubscriber, Publisher<I> publisher) {
            this.actual = coreSubscriber;
            this.source = publisher;
        }

        public Context currentContext() {
            return this.actual.currentContext();
        }

        protected void hookOnSubscribe(Subscription subscription) {
            if (this.state.compareAndSet(InflightLimiterSubscriberState.SUBSCRIBING, InflightLimiterSubscriberState.SUBSCRIBED)) {
                InflightLimiter.this.scheduleSubscribed(this);
            }
        }

        protected void hookOnNext(I i) {
            this.actual.onNext(i);
            InflightLimiter.this.inflight.decrementAndGet();
            this.inflightForSubscription.decrementAndGet();
            maybeAddToPending();
            InflightLimiter.this.maybeTriggerNext();
        }

        protected void hookOnComplete() {
            InflightLimiter.this.activeSubscriptions.decrementAndGet();
            this.actual.onComplete();
            clearInflight();
            InflightLimiter.this.maybeTriggerNext();
        }

        private void clearInflight() {
            InflightLimiter.this.inflight.addAndGet(-this.inflightForSubscription.getAndSet(0));
        }

        protected void hookOnError(Throwable th) {
            InflightLimiter.this.activeSubscriptions.decrementAndGet();
            this.actual.onError(th);
            clearInflight();
            InflightLimiter.this.maybeTriggerNext();
        }

        protected void hookOnCancel() {
            InflightLimiter.this.activeSubscriptions.decrementAndGet();
            clearInflight();
            this.requestedDemand.set(0L);
            InflightLimiter.this.maybeTriggerNext();
        }

        Subscription getSubscription() {
            return this.subscription;
        }

        void requestMore() {
            int max = Math.max(InflightLimiter.this.maxInflight / Math.max(InflightLimiter.this.activeSubscriptions.get(), InflightLimiter.this.expectedSubscriptionsInflight), 1);
            if (this.requestedDemand.get() <= 0 || (this.state.get() != InflightLimiterSubscriberState.SUBSCRIBED && (this.inflightForSubscription.get() >= max || InflightLimiter.this.inflight.get() >= InflightLimiter.this.maxInflight))) {
                maybeAddToPending();
                return;
            }
            if (this.state.compareAndSet(InflightLimiterSubscriberState.INITIAL, InflightLimiterSubscriberState.SUBSCRIBING)) {
                InflightLimiter.this.inflight.incrementAndGet();
                this.inflightForSubscription.incrementAndGet();
                this.source.subscribe(this);
            } else if (this.state.get() == InflightLimiterSubscriberState.REQUESTING || this.state.get() == InflightLimiterSubscriberState.SUBSCRIBED) {
                if (this.state.compareAndSet(InflightLimiterSubscriberState.SUBSCRIBED, InflightLimiterSubscriberState.REQUESTING)) {
                    InflightLimiter.this.inflight.decrementAndGet();
                    this.inflightForSubscription.decrementAndGet();
                }
                long max2 = Math.max(Math.min(this.requestedDemand.get(), max - this.inflightForSubscription.get()), 1L);
                InflightLimiter.this.inflight.addAndGet((int) max2);
                this.requestedDemand.addAndGet(-max2);
                this.inflightForSubscription.addAndGet((int) max2);
                request(max2);
            }
        }

        void maybeAddToPending() {
            if (this.requestedDemand.get() <= 0 || isDisposed() || this.inflightForSubscription.get() != 0) {
                return;
            }
            InflightLimiter.this.pendingSubscriptions.add(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/reactive/client/internal/api/InflightLimiter$InflightLimiterSubscriberState.class */
    public enum InflightLimiterSubscriberState {
        INITIAL,
        SUBSCRIBING,
        SUBSCRIBED,
        REQUESTING
    }

    public InflightLimiter(int i) {
        this(i, 0, Schedulers.single(), DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
    }

    public InflightLimiter(int i, int i2, Scheduler scheduler, int i3) {
        this.inflight = new AtomicInteger();
        this.activeSubscriptions = new AtomicInteger();
        this.triggerNextTriggered = new AtomicBoolean(false);
        if (i < 1) {
            throw new IllegalArgumentException("maxInflight must be greater than 0");
        }
        this.maxInflight = i;
        this.expectedSubscriptionsInflight = i2;
        this.triggerNextWorker = scheduler.createWorker();
        if (i2 > i) {
            throw new IllegalArgumentException("maxSubscriptionInflight must be equal or less than maxInflight.");
        }
        this.pendingSubscriptions = new MpmcArrayQueue<>(i3);
    }

    @Override // org.apache.pulsar.reactive.client.internal.api.PublisherTransformer
    public <T> Publisher<T> transform(Publisher<T> publisher) {
        return publisher instanceof Mono ? createOperator((Mono) publisher) : createOperator(Flux.from(publisher));
    }

    <I> Flux<I> createOperator(Flux<I> flux) {
        return new FluxOperator<I, I>(flux) { // from class: org.apache.pulsar.reactive.client.internal.api.InflightLimiter.1
            public void subscribe(CoreSubscriber<? super I> coreSubscriber) {
                InflightLimiter.this.handleSubscribe(this.source, coreSubscriber);
            }
        };
    }

    <I> Mono<I> createOperator(Mono<I> mono) {
        return new MonoOperator<I, I>(mono) { // from class: org.apache.pulsar.reactive.client.internal.api.InflightLimiter.2
            public void subscribe(CoreSubscriber<? super I> coreSubscriber) {
                InflightLimiter.this.handleSubscribe(this.source, coreSubscriber);
            }
        };
    }

    <I> void handleSubscribe(Publisher<I> publisher, CoreSubscriber<? super I> coreSubscriber) {
        this.activeSubscriptions.incrementAndGet();
        coreSubscriber.onSubscribe(new InflightLimiterSubscriber(coreSubscriber, publisher).getSubscription());
    }

    void maybeTriggerNext() {
        if (this.triggerNextWorker.isDisposed() || this.inflight.get() >= this.maxInflight || this.pendingSubscriptions.isEmpty() || !this.triggerNextTriggered.compareAndSet(false, true)) {
            return;
        }
        this.triggerNextWorker.schedule(() -> {
            InflightLimiterSubscriber inflightLimiterSubscriber;
            this.triggerNextTriggered.set(false);
            int size = this.pendingSubscriptions.size();
            while (this.inflight.get() < this.maxInflight) {
                int i = size;
                size--;
                if (i <= 0 || (inflightLimiterSubscriber = (InflightLimiterSubscriber) this.pendingSubscriptions.poll()) == null) {
                    return;
                }
                if (!inflightLimiterSubscriber.isDisposed()) {
                    inflightLimiterSubscriber.requestMore();
                }
            }
        });
    }

    void scheduleSubscribed(InflightLimiterSubscriber<?> inflightLimiterSubscriber) {
        if (this.triggerNextWorker.isDisposed()) {
            return;
        }
        this.triggerNextWorker.schedule(() -> {
            if (inflightLimiterSubscriber.isDisposed()) {
                return;
            }
            inflightLimiterSubscriber.requestMore();
        });
    }

    public void dispose() {
        this.triggerNextWorker.dispose();
        this.pendingSubscriptions.drain((v0) -> {
            v0.cancel();
        });
    }

    public boolean isDisposed() {
        return this.triggerNextWorker.isDisposed();
    }
}
