package io.smallrye.mutiny.operators.multi.processors;

import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.subscription.BackPressureFailure;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:BOOT-INF/lib/mutiny-1.7.0.jar:io/smallrye/mutiny/operators/multi/processors/UnicastProcessor.class */
public class UnicastProcessor<T> extends AbstractMulti<T> implements Processor<T, T>, Subscription {
    private final Runnable onTermination;
    private final Queue<T> queue;
    private static final AtomicReferenceFieldUpdater<UnicastProcessor, Subscriber> DOWNSTREAM_UPDATER = AtomicReferenceFieldUpdater.newUpdater(UnicastProcessor.class, Subscriber.class, "downstream");
    private volatile boolean hasUpstream;
    private volatile boolean done = false;
    private volatile Throwable failure = null;
    private volatile boolean cancelled = false;
    private volatile Subscriber<? super T> downstream = null;
    private final AtomicInteger wip = new AtomicInteger();
    private final AtomicLong requested = new AtomicLong();

    public static <I> UnicastProcessor<I> create() {
        return new UnicastProcessor<>((Queue) Queues.unbounded(Queues.BUFFER_S).get(), null);
    }

    public static <I> UnicastProcessor<I> create(Queue<I> queue, Runnable runnable) {
        return new UnicastProcessor<>(queue, runnable);
    }

    private UnicastProcessor(Queue<T> queue, Runnable runnable) {
        this.queue = (Queue) ParameterValidation.nonNull(queue, "queue");
        this.onTermination = runnable;
    }

    private void onTerminate() {
        if (this.onTermination != null) {
            this.onTermination.run();
        }
    }

    void drainWithDownstream(Subscriber<? super T> subscriber) {
        long j;
        int i = 1;
        Queue<T> queue = this.queue;
        do {
            long j2 = this.requested.get();
            long j3 = 0;
            while (true) {
                j = j3;
                if (j2 == j) {
                    break;
                }
                T poll = queue.poll();
                boolean z = poll == null;
                if (isCancelledOrDone(this.done, z)) {
                    return;
                }
                if (z) {
                    break;
                }
                subscriber.onNext(poll);
                j3 = j + 1;
            }
            if (j2 == j && isCancelledOrDone(this.done, queue.isEmpty())) {
                return;
            }
            if (j != 0 && j2 != Long.MAX_VALUE) {
                this.requested.addAndGet(-j);
            }
            i = this.wip.addAndGet(-i);
        } while (i != 0);
    }

    private void drain() {
        if (this.wip.getAndIncrement() != 0) {
            return;
        }
        int i = 1;
        do {
            Subscriber<? super T> subscriber = this.downstream;
            if (subscriber != null) {
                drainWithDownstream(subscriber);
                return;
            }
            i = this.wip.addAndGet(-i);
        } while (i != 0);
    }

    private boolean isCancelledOrDone(boolean z, boolean z2) {
        Subscriber<? super T> subscriber = this.downstream;
        if (this.cancelled) {
            this.queue.clear();
            return true;
        }
        if (!z || !z2) {
            return false;
        }
        Throwable th = this.failure;
        if (th != null) {
            subscriber.onError(th);
            return true;
        }
        subscriber.onComplete();
        return true;
    }

    @Override // org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        if (this.hasUpstream) {
            subscription.cancel();
        } else if (isDoneOrCancelled()) {
            subscription.cancel();
        } else {
            this.hasUpstream = true;
            subscription.request(Long.MAX_VALUE);
        }
    }

    @Override // io.smallrye.mutiny.operators.AbstractMulti
    public void subscribe(MultiSubscriber<? super T> multiSubscriber) {
        ParameterValidation.nonNull(multiSubscriber, "downstream");
        if (!DOWNSTREAM_UPDATER.compareAndSet(this, null, multiSubscriber)) {
            Subscriptions.fail(multiSubscriber, new IllegalStateException("Already subscribed"));
            return;
        }
        multiSubscriber.onSubscribe(this);
        if (this.cancelled) {
            return;
        }
        drain();
    }

    @Override // org.reactivestreams.Subscriber
    public synchronized void onNext(T t) {
        if (isDoneOrCancelled()) {
            return;
        }
        if (this.queue.offer(t)) {
            drain();
        } else {
            onError(new BackPressureFailure("the queue is full"));
        }
    }

    private boolean isDoneOrCancelled() {
        return this.done || this.cancelled;
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        Objects.requireNonNull(th);
        if (isDoneOrCancelled()) {
            return;
        }
        onTerminate();
        this.failure = th;
        this.done = true;
        drain();
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        if (isDoneOrCancelled()) {
            return;
        }
        onTerminate();
        this.done = true;
        drain();
    }

    @Override // org.reactivestreams.Subscription
    public void request(long j) {
        if (j > 0) {
            Subscriptions.add(this.requested, j);
            drain();
        }
    }

    @Override // org.reactivestreams.Subscription, io.smallrye.mutiny.subscription.Cancellable
    public void cancel() {
        if (this.cancelled) {
            return;
        }
        this.cancelled = true;
        if (DOWNSTREAM_UPDATER.getAndSet(this, null) != null) {
            onTerminate();
            if (this.wip.getAndIncrement() == 0) {
                this.queue.clear();
            }
        }
    }

    public boolean hasSubscriber() {
        return this.downstream != null;
    }

    public SerializedProcessor<T, T> serialized() {
        return new SerializedProcessor<>(this);
    }
}
