package ratpack.stream.internal;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import ratpack.exec.Promise;
import ratpack.func.Function;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.YieldRequest;

/* loaded from: input_file:ratpack/stream/internal/FlatYieldingPublisher.class */
public class FlatYieldingPublisher<T> implements TransformablePublisher<T> {
    private final Function<? super YieldRequest, ? extends Promise<? extends T>> producer;
    private final AtomicLong subscriptionCounter = new AtomicLong();

    /* loaded from: input_file:ratpack/stream/internal/FlatYieldingPublisher$Subscription.class */
    private class Subscription extends SubscriptionSupport<T> {
        private final long subscriptionNum;
        private final AtomicInteger requestCounter;
        private final AtomicLong waiting;
        private final AtomicBoolean draining;

        public Subscription(Subscriber<? super T> subscriber) {
            super(subscriber);
            this.subscriptionNum = FlatYieldingPublisher.this.subscriptionCounter.getAndIncrement();
            this.requestCounter = new AtomicInteger();
            this.waiting = new AtomicLong();
            this.draining = new AtomicBoolean();
            start();
        }

        @Override // ratpack.stream.internal.SubscriptionSupport
        protected void doRequest(long j) {
            this.waiting.addAndGet(j);
            drain();
        }

        private void drain() {
            if (isStopped() || !this.draining.compareAndSet(false, true)) {
                return;
            }
            if (this.waiting.getAndDecrement() > 0) {
                try {
                    ((Promise) FlatYieldingPublisher.this.producer.apply(new DefaultYieldRequest(this.subscriptionNum, this.requestCounter.getAndIncrement()))).wiretap(result -> {
                        this.draining.set(false);
                    }).onError(this::onError).then(obj -> {
                        if (obj == null) {
                            onComplete();
                        } else {
                            onNext(obj);
                            drain();
                        }
                    });
                } catch (Throwable th) {
                    this.draining.set(false);
                    onError(th);
                    return;
                }
            } else {
                this.waiting.addAndGet(1L);
                this.draining.set(false);
            }
            if (this.waiting.get() > 0) {
                drain();
            }
        }
    }

    public FlatYieldingPublisher(Function<? super YieldRequest, ? extends Promise<? extends T>> function) {
        this.producer = function;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super T> subscriber) {
        new Subscription(subscriber);
    }
}
