package hu.akarnokd.reactive4javaflow.processors;

import hu.akarnokd.reactive4javaflow.FolyamPlugins;
import hu.akarnokd.reactive4javaflow.FolyamSubscriber;
import hu.akarnokd.reactive4javaflow.functionals.AutoDisposable;
import hu.akarnokd.reactive4javaflow.fused.FusedQueue;
import hu.akarnokd.reactive4javaflow.fused.FusedSubscription;
import hu.akarnokd.reactive4javaflow.impl.BooleanSubscription;
import hu.akarnokd.reactive4javaflow.impl.ExceptionHelper;
import hu.akarnokd.reactive4javaflow.impl.SubscriptionHelper;
import hu.akarnokd.reactive4javaflow.impl.VH;
import hu.akarnokd.reactive4javaflow.impl.util.SpscArrayQueue;
import hu.akarnokd.reactive4javaflow.impl.util.SpscOneQueue;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:hu/akarnokd/reactive4javaflow/processors/MulticastProcessor.class */
public final class MulticastProcessor<T> extends FolyamProcessor<T> implements AutoDisposable {
    final int prefetch;
    MulticastSubscription<T>[] subscribers;
    int wip;
    Throwable error;
    Flow.Subscription upstream;
    FusedQueue<T> queue;
    int consumed;
    int sourceFused;
    static final VarHandle SUBSCRIBERS = VH.find(MethodHandles.lookup(), MulticastProcessor.class, "subscribers", MulticastSubscription[].class);
    static final MulticastSubscription[] EMPTY = new MulticastSubscription[0];
    static final MulticastSubscription[] TERMINATED = new MulticastSubscription[0];
    static final VarHandle WIP = VH.find(MethodHandles.lookup(), MulticastProcessor.class, "wip", Integer.TYPE);
    static final VarHandle ERROR = VH.find(MethodHandles.lookup(), MulticastProcessor.class, "error", Throwable.class);
    static final VarHandle UPSTREAM = VH.find(MethodHandles.lookup(), MulticastProcessor.class, "upstream", Flow.Subscription.class);
    static final VarHandle QUEUE = VH.find(MethodHandles.lookup(), MulticastProcessor.class, "queue", FusedQueue.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:hu/akarnokd/reactive4javaflow/processors/MulticastProcessor$MulticastSubscription.class */
    public static final class MulticastSubscription<T> extends AtomicLong implements Flow.Subscription {
        final FolyamSubscriber<? super T> actual;
        final MulticastProcessor<T> parent;
        long emitted;

        MulticastSubscription(FolyamSubscriber<? super T> folyamSubscriber, MulticastProcessor<T> multicastProcessor) {
            this.actual = folyamSubscriber;
            this.parent = multicastProcessor;
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void request(long j) {
            SubscriptionHelper.addRequestedCancellable(this, j);
            this.parent.drain();
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void cancel() {
            if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) {
                this.parent.remove(this);
                this.parent.drain();
            }
        }

        boolean isCancelled() {
            return getAcquire() == Long.MIN_VALUE;
        }

        void onNext(T t) {
            if (getAcquire() != Long.MIN_VALUE) {
                this.emitted++;
                this.actual.onNext(t);
            }
        }

        void onError(Throwable th) {
            if (getAcquire() != Long.MIN_VALUE) {
                setRelease(Long.MIN_VALUE);
                this.actual.onError(th);
            }
        }

        void onComplete() {
            if (getAcquire() != Long.MIN_VALUE) {
                setRelease(Long.MIN_VALUE);
                this.actual.onComplete();
            }
        }
    }

    public MulticastProcessor() {
        this(FolyamPlugins.defaultBufferSize());
    }

    public MulticastProcessor(int i) {
        this.prefetch = i;
        SUBSCRIBERS.setRelease(this, EMPTY);
    }

    @Override // hu.akarnokd.reactive4javaflow.processors.FlowProcessorSupport
    public boolean hasThrowable() {
        Throwable acquire = ERROR.getAcquire(this);
        return (acquire == null || acquire == ExceptionHelper.TERMINATED) ? false : true;
    }

    @Override // hu.akarnokd.reactive4javaflow.processors.FlowProcessorSupport
    public Throwable getThrowable() {
        Throwable acquire = ERROR.getAcquire(this);
        if (acquire != ExceptionHelper.TERMINATED) {
            return acquire;
        }
        return null;
    }

    @Override // hu.akarnokd.reactive4javaflow.processors.FlowProcessorSupport
    public boolean hasComplete() {
        return ERROR.getAcquire(this) == ExceptionHelper.TERMINATED;
    }

    @Override // hu.akarnokd.reactive4javaflow.processors.FlowProcessorSupport
    public boolean hasSubscribers() {
        return SUBSCRIBERS.getAcquire(this).length != 0;
    }

    @Override // hu.akarnokd.reactive4javaflow.Folyam
    protected void subscribeActual(FolyamSubscriber<? super T> folyamSubscriber) {
        MulticastSubscription<T> multicastSubscription = new MulticastSubscription<>(folyamSubscriber, this);
        folyamSubscriber.onSubscribe(multicastSubscription);
        if (add(multicastSubscription)) {
            if (multicastSubscription.isCancelled()) {
                remove(multicastSubscription);
                return;
            } else {
                drain();
                return;
            }
        }
        Throwable acquire = ERROR.getAcquire(this);
        if (acquire == ExceptionHelper.TERMINATED) {
            multicastSubscription.actual.onComplete();
        } else {
            multicastSubscription.actual.onError(acquire);
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        if (SubscriptionHelper.replace(this, UPSTREAM, subscription)) {
            if (subscription instanceof FusedSubscription) {
                FusedSubscription fusedSubscription = (FusedSubscription) subscription;
                int requestFusion = fusedSubscription.requestFusion(3);
                if (requestFusion == 1) {
                    this.sourceFused = requestFusion;
                    QUEUE.setRelease(this, fusedSubscription);
                    ERROR.setRelease(this, ExceptionHelper.TERMINATED);
                    drain();
                    return;
                }
                if (requestFusion == 2) {
                    this.sourceFused = requestFusion;
                    QUEUE.setRelease(this, fusedSubscription);
                    subscription.request(this.prefetch);
                    return;
                }
            }
            int i = this.prefetch;
            if (i == 1) {
                QUEUE.setRelease(this, new SpscOneQueue());
            } else {
                QUEUE.setRelease(this, new SpscArrayQueue(i));
            }
            subscription.request(i);
        }
    }

    public void start() {
        onSubscribe(new BooleanSubscription());
    }

    public boolean prepare(Flow.Subscription subscription) {
        return UPSTREAM.compareAndSet(this, null, subscription);
    }

    public boolean hasTerminated() {
        return ERROR.getAcquire(this) != null;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(T t) {
        if (this.error != null) {
            return;
        }
        if (t != null) {
            if (!this.queue.offer(t)) {
                onError(new IllegalStateException("Not all consumers are ready to receive items"));
                return;
            }
        } else if (this.sourceFused == 0) {
            throw new NullPointerException("item == null");
        }
        drain();
    }

    public boolean tryOnNext(T t) {
        if (this.error != null) {
            return true;
        }
        if (this.sourceFused != 0) {
            throw new IllegalStateException("MulticastProcessor already consuming items from an upstream source");
        }
        if (!this.queue.offer(t)) {
            return false;
        }
        drain();
        return true;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        Objects.requireNonNull(th, "throwable == null");
        if (this.error == null && ERROR.compareAndSet(this, null, th)) {
            drain();
        } else {
            FolyamPlugins.onError(th);
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        if (ERROR.compareAndSet(this, null, ExceptionHelper.TERMINATED)) {
            drain();
        }
    }

    @Override // hu.akarnokd.reactive4javaflow.functionals.AutoDisposable, java.lang.AutoCloseable
    public void close() {
        SubscriptionHelper.cancel(this, UPSTREAM);
        if (ERROR.getAcquire(this) == null && ERROR.compareAndSet(this, null, new CancellationException("MulticastProcessor closed"))) {
            drain();
        }
    }

    boolean add(MulticastSubscription<T> multicastSubscription) {
        MulticastSubscription[] acquire;
        MulticastSubscription[] multicastSubscriptionArr;
        do {
            acquire = SUBSCRIBERS.getAcquire(this);
            if (acquire == TERMINATED) {
                return false;
            }
            int length = acquire.length;
            multicastSubscriptionArr = new MulticastSubscription[length + 1];
            System.arraycopy(acquire, 0, multicastSubscriptionArr, 0, length);
            multicastSubscriptionArr[length] = multicastSubscription;
        } while (!SUBSCRIBERS.compareAndSet(this, acquire, multicastSubscriptionArr));
        return true;
    }

    void remove(MulticastSubscription<T> multicastSubscription) {
        MulticastSubscription[] acquire;
        MulticastSubscription[] multicastSubscriptionArr;
        do {
            acquire = SUBSCRIBERS.getAcquire(this);
            int length = acquire.length;
            if (length == 0) {
                return;
            }
            int i = -1;
            int i2 = 0;
            while (true) {
                if (i2 >= length) {
                    break;
                }
                if (multicastSubscription == acquire[i2]) {
                    i = i2;
                    break;
                }
                i2++;
            }
            if (i < 0) {
                return;
            }
            if (length == 1) {
                multicastSubscriptionArr = EMPTY;
            } else {
                multicastSubscriptionArr = new MulticastSubscription[length - 1];
                System.arraycopy(acquire, 0, multicastSubscriptionArr, 0, i);
                System.arraycopy(acquire, i + 1, multicastSubscriptionArr, i, (length - i) - 1);
            }
        } while (!SUBSCRIBERS.compareAndSet(this, acquire, multicastSubscriptionArr));
    }

    void drain() {
        if (WIP.getAndAdd(this, 1) != 0) {
            return;
        }
        do {
            FusedQueue acquire = QUEUE.getAcquire(this);
            if (acquire != null) {
                if (this.sourceFused == 1) {
                    drainSync(acquire);
                    return;
                } else {
                    drainNormal(acquire);
                    return;
                }
            }
        } while (WIP.getAndAdd(this, -1) - 1 != 0);
    }

    void drainSync(FusedQueue<T> fusedQueue) {
        int i = 1;
        while (true) {
            MulticastSubscription[] acquire = SUBSCRIBERS.getAcquire(this);
            int length = acquire.length;
            int i2 = 0;
            int i3 = 0;
            for (MulticastSubscription multicastSubscription : acquire) {
                long acquire2 = multicastSubscription.getAcquire();
                if (acquire2 != Long.MIN_VALUE) {
                    i3++;
                    if (acquire2 - multicastSubscription.emitted != 0) {
                        i2++;
                    }
                }
            }
            if (i3 != 0 && i3 == i2) {
                try {
                    T poll = fusedQueue.poll();
                    if (poll == null) {
                        terminate(ERROR.getAcquire(this));
                        return;
                    }
                    for (MulticastSubscription multicastSubscription2 : acquire) {
                        multicastSubscription2.onNext(poll);
                    }
                } catch (Throwable th) {
                    th = th;
                    this.upstream.cancel();
                    if (ExceptionHelper.addThrowable(this, ERROR, th)) {
                        th = ERROR.getAcquire(this);
                    } else {
                        ERROR.setRelease(this, th);
                    }
                    terminate(th);
                    return;
                }
            } else if (fusedQueue.isEmpty()) {
                terminate(ERROR.getAcquire(this));
                return;
            } else {
                i = WIP.getAndAdd(this, -i) - i;
                if (i == 0) {
                    return;
                }
            }
        }
    }

    void terminate(Throwable th) {
        if (th == ExceptionHelper.TERMINATED) {
            for (MulticastSubscription multicastSubscription : SUBSCRIBERS.getAndSet(this, TERMINATED)) {
                multicastSubscription.onComplete();
            }
            return;
        }
        for (MulticastSubscription multicastSubscription2 : SUBSCRIBERS.getAndSet(this, TERMINATED)) {
            multicastSubscription2.onError(th);
        }
    }

    void drainNormal(FusedQueue<T> fusedQueue) {
        int i = 1;
        int i2 = this.prefetch;
        int i3 = i2 - (i2 >> 2);
        int i4 = this.consumed;
        while (true) {
            MulticastSubscription[] acquire = SUBSCRIBERS.getAcquire(this);
            int length = acquire.length;
            int i5 = 0;
            int i6 = 0;
            for (MulticastSubscription multicastSubscription : acquire) {
                long acquire2 = multicastSubscription.getAcquire();
                if (acquire2 != Long.MIN_VALUE) {
                    i6++;
                    if (acquire2 - multicastSubscription.emitted != 0) {
                        i5++;
                    }
                }
            }
            boolean z = ERROR.getAcquire(this) != null;
            if (i6 != 0 && i6 == i5) {
                try {
                    T poll = fusedQueue.poll();
                    boolean z2 = poll == null;
                    if (z && z2) {
                        terminate(ERROR.getAcquire(this));
                        return;
                    }
                    if (!z2) {
                        for (MulticastSubscription multicastSubscription2 : acquire) {
                            multicastSubscription2.onNext(poll);
                        }
                        i4++;
                        if (i4 == i3) {
                            i4 = 0;
                            this.upstream.request(i3);
                        }
                    }
                } catch (Throwable th) {
                    th = th;
                    this.upstream.cancel();
                    if (ExceptionHelper.addThrowable(this, ERROR, th)) {
                        th = ERROR.getAcquire(this);
                    } else {
                        ERROR.setRelease(this, th);
                    }
                    terminate(th);
                    return;
                }
            } else if (z && fusedQueue.isEmpty()) {
                terminate(ERROR.getAcquire(this));
                return;
            }
            this.consumed = i4;
            i = WIP.getAndAdd(this, -i) - i;
            if (i == 0) {
                return;
            }
        }
    }
}
