package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.ConcurrentUtils;
import io.servicetalk.concurrent.internal.DelayedSubscription;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.QueueFullException;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.utils.internal.PlatformDependent;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/servicetalk/concurrent/api/PublisherFlatMapMerge.class */
public final class PublisherFlatMapMerge<T, R> extends AbstractAsynchronousPublisherOperator<T, R> {
    static final int FLAT_MAP_DEFAULT_CONCURRENCY = 16;
    private static final Logger LOGGER = LoggerFactory.getLogger(PublisherFlatMapMerge.class);
    private static final int MIN_MAPPED_DEMAND = 1;
    private final Function<? super T, ? extends Publisher<? extends R>> mapper;
    private final int maxConcurrency;
    private final int maxDelayedErrors;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/servicetalk/concurrent/api/PublisherFlatMapMerge$FlatMapSubscriber.class */
    public static final class FlatMapSubscriber<T, R> implements PublisherSource.Subscriber<T>, PublisherSource.Subscription {
        private static final Object MAPPED_SOURCE_COMPLETE;
        private static final AtomicReferenceFieldUpdater<FlatMapSubscriber, Throwable> pendingErrorUpdater;
        private static final AtomicIntegerFieldUpdater<FlatMapSubscriber> pendingErrorCountUpdater;
        private static final AtomicIntegerFieldUpdater<FlatMapSubscriber> emittingLockUpdater;
        private static final AtomicLongFieldUpdater<FlatMapSubscriber> mappedDemandUpdater;
        private static final AtomicLongFieldUpdater<FlatMapSubscriber> pendingDemandUpdater;
        private static final AtomicIntegerFieldUpdater<FlatMapSubscriber> activeMappedSourcesUpdater;

        @Nullable
        private volatile Throwable pendingError;
        private volatile int pendingErrorCount;
        private volatile int emittingLock;
        private volatile int activeMappedSources;
        private volatile long pendingDemand;
        private volatile long mappedDemand;
        private boolean targetTerminated;

        @Nullable
        private Throwable upstreamError;

        @Nullable
        private PublisherSource.Subscription subscription;
        private final PublisherSource.Subscriber<? super R> target;
        private final Queue<Object> signals = PlatformDependent.newUnboundedMpscQueue(4);
        private final PublisherFlatMapMerge<T, R> source;
        private final CancellableSet cancellableSet;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:io/servicetalk/concurrent/api/PublisherFlatMapMerge$FlatMapSubscriber$FlatMapPublisherSubscriber.class */
        public static final class FlatMapPublisherSubscriber<T, R> implements PublisherSource.Subscriber<R>, Cancellable {
            private static final AtomicIntegerFieldUpdater<FlatMapPublisherSubscriber> pendingDemandUpdater;
            private static final int TERMINATED = -2;
            private final FlatMapSubscriber<T, R> parent;
            private final DelayedSubscription subscription = new DelayedSubscription();
            private volatile int innerPendingDemand;
            private boolean signalsQueued;
            static final /* synthetic */ boolean $assertionsDisabled;

            FlatMapPublisherSubscriber(FlatMapSubscriber<T, R> flatMapSubscriber) {
                this.parent = flatMapSubscriber;
            }

            @Override // io.servicetalk.concurrent.Cancellable
            public void cancel() {
                this.subscription.cancel();
            }

            boolean request(int i) {
                if (!$assertionsDisabled && i <= 0) {
                    throw new AssertionError();
                }
                this.signalsQueued = false;
                if (!pendingDemandUpdater.compareAndSet(this, 0, i)) {
                    return false;
                }
                this.subscription.request(i);
                return true;
            }

            void markSignalsQueued() {
                this.signalsQueued = true;
            }

            boolean hasSignalsQueued() {
                return this.signalsQueued;
            }

            @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
            public void onSubscribe(PublisherSource.Subscription subscription) {
                this.subscription.delayedSubscription(ConcurrentSubscription.wrap(subscription));
                this.parent.distributeMappedDemand(this);
            }

            @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
            public void onNext(@Nullable R r) {
                int decrementAndGet = pendingDemandUpdater.decrementAndGet(this);
                if (decrementAndGet < 0) {
                    handleInvalidDemand(decrementAndGet, r);
                }
                try {
                    this.parent.tryEmitItem(SubscriberApiUtils.wrapNull(r), true, this);
                    if (decrementAndGet == 0) {
                        this.parent.tryEmitItem(this, false, this);
                    }
                } catch (Throwable th) {
                    if (decrementAndGet == 0) {
                        this.parent.tryEmitItem(this, false, this);
                    }
                    throw th;
                }
            }

            private void handleInvalidDemand(int i, @Nullable R r) {
                pendingDemandUpdater.compareAndSet(this, i, i > -2 ? 0 : -2);
                throw new IllegalStateException("Too many onNext signals for Subscriber: " + this + " pendingDemand: " + i + " discarding: " + r);
            }

            @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
            public void onError(Throwable th) {
                int andSet = pendingDemandUpdater.getAndSet(this, -2);
                if (andSet < 0) {
                    SubscriberUtils.logDuplicateTerminal(this, th);
                    return;
                }
                Throwable th2 = ((FlatMapSubscriber) this.parent).pendingError;
                if (((FlatMapSubscriber) this.parent).source.maxDelayedErrors == 0) {
                    if (th2 == null && FlatMapSubscriber.pendingErrorUpdater.compareAndSet(this.parent, null, th)) {
                        try {
                            this.parent.doCancel(true);
                            this.parent.tryEmitItem(TerminalNotification.error(th), false, this);
                            return;
                        } catch (Throwable th3) {
                            this.parent.tryEmitItem(TerminalNotification.error(th), false, this);
                            throw th3;
                        }
                    }
                    return;
                }
                if (th2 != null) {
                    CompositeExceptionUtils.addPendingError(FlatMapSubscriber.pendingErrorCountUpdater, this.parent, ((FlatMapSubscriber) this.parent).source.maxDelayedErrors, th2, th);
                } else if (FlatMapSubscriber.pendingErrorUpdater.compareAndSet(this.parent, null, th)) {
                    th2 = th;
                } else {
                    th2 = ((FlatMapSubscriber) this.parent).pendingError;
                    if (!$assertionsDisabled && th2 == null) {
                        throw new AssertionError();
                    }
                    CompositeExceptionUtils.addPendingError(FlatMapSubscriber.pendingErrorCountUpdater, this.parent, ((FlatMapSubscriber) this.parent).source.maxDelayedErrors, th2, th);
                }
                if (this.parent.removeSubscriber(this, andSet)) {
                    this.parent.enqueueAndDrain(TerminalNotification.error(th2));
                } else {
                    this.parent.tryEmitItem(FlatMapSubscriber.MAPPED_SOURCE_COMPLETE, false, this);
                }
            }

            @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
            public void onComplete() {
                int andSet = pendingDemandUpdater.getAndSet(this, -2);
                if (andSet < 0) {
                    SubscriberUtils.logDuplicateTerminal(this);
                } else if (this.parent.removeSubscriber(this, andSet)) {
                    this.parent.enqueueAndDrain(TerminalNotification.complete());
                } else {
                    this.parent.tryEmitItem(FlatMapSubscriber.MAPPED_SOURCE_COMPLETE, false, this);
                }
            }

            static {
                $assertionsDisabled = !PublisherFlatMapMerge.class.desiredAssertionStatus();
                pendingDemandUpdater = AtomicIntegerFieldUpdater.newUpdater(FlatMapPublisherSubscriber.class, "innerPendingDemand");
            }
        }

        FlatMapSubscriber(PublisherFlatMapMerge<T, R> publisherFlatMapMerge, PublisherSource.Subscriber<? super R> subscriber) {
            this.source = publisherFlatMapMerge;
            this.target = subscriber;
            this.cancellableSet = new CancellableSet(Math.min(16, ((PublisherFlatMapMerge) publisherFlatMapMerge).maxConcurrency));
        }

        @Override // io.servicetalk.concurrent.Cancellable
        public void cancel() {
            this.pendingDemand = -1L;
            doCancel(true);
        }

        @Override // io.servicetalk.concurrent.PublisherSource.Subscription
        public void request(long j) {
            if (!$assertionsDisabled && this.subscription == null) {
                throw new AssertionError();
            }
            if (!SubscriberUtils.isRequestNValid(j)) {
                this.subscription.request(j);
                return;
            }
            if (pendingDemandUpdater.getAndAccumulate(this, j, FlowControlUtils::addWithOverflowProtectionIfNotNegative) == 0) {
                drainPending();
            }
            incMappedDemand(j);
        }

        @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
        public void onSubscribe(PublisherSource.Subscription subscription) {
            if (SubscriberUtils.checkDuplicateSubscription(this.subscription, subscription)) {
                this.subscription = ConcurrentSubscription.wrap(subscription);
                this.target.onSubscribe(this);
                this.subscription.request(((PublisherFlatMapMerge) this.source).maxConcurrency);
            }
        }

        @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
        public void onNext(@Nullable T t) {
            Publisher publisher = (Publisher) Objects.requireNonNull(((PublisherFlatMapMerge) this.source).mapper.apply(t), (Supplier<String>) () -> {
                return "Mapper " + ((PublisherFlatMapMerge) this.source).mapper + " returned null";
            });
            FlatMapPublisherSubscriber flatMapPublisherSubscriber = new FlatMapPublisherSubscriber(this);
            if (!this.cancellableSet.add(flatMapPublisherSubscriber) || activeMappedSourcesUpdater.incrementAndGet(this) <= 0) {
                return;
            }
            publisher.subscribeInternal(flatMapPublisherSubscriber);
        }

        @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
        public void onError(Throwable th) {
            this.upstreamError = th;
            try {
                doCancel(false);
            } finally {
                enqueueAndDrain(TerminalNotification.error(th));
            }
        }

        @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
        public void onComplete() {
            if (terminateActiveMappedSources()) {
                enqueueAndDrain(TerminalNotification.complete());
            }
        }

        private void incMappedDemand(long j) {
            if (!$assertionsDisabled && j <= 0) {
                throw new AssertionError();
            }
            mappedDemandUpdater.getAndAccumulate(this, j, FlowControlUtils::addWithUnderOverflowProtection);
        }

        private int reserveMappedDemandQuota() {
            while (true) {
                long j = this.mappedDemand;
                if (j > 0) {
                    int calculateRequestNQuota = calculateRequestNQuota(j);
                    if (mappedDemandUpdater.compareAndSet(this, j, j - calculateRequestNQuota)) {
                        return calculateRequestNQuota;
                    }
                } else if (mappedDemandUpdater.compareAndSet(this, j, j - 1)) {
                    return 1;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void distributeMappedDemand(FlatMapPublisherSubscriber<T, R> flatMapPublisherSubscriber) {
            int reserveMappedDemandQuota = reserveMappedDemandQuota();
            if (flatMapPublisherSubscriber.request(reserveMappedDemandQuota)) {
                return;
            }
            incMappedDemand(reserveMappedDemandQuota);
        }

        private int calculateRequestNQuota(long j) {
            return (int) Math.min(2147483647L, Math.max(j / ((PublisherFlatMapMerge) this.source).maxConcurrency, 1L));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doCancel(boolean z) {
            if (z) {
                try {
                    if (!$assertionsDisabled && this.subscription == null) {
                        throw new AssertionError();
                    }
                    this.subscription.cancel();
                } finally {
                    this.cancellableSet.cancel();
                }
            }
        }

        private boolean tryDecrementPendingDemand() {
            long j;
            do {
                j = this.pendingDemand;
                if (j <= 0) {
                    return false;
                }
            } while (!pendingDemandUpdater.compareAndSet(this, j, j - 1));
            return true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void tryEmitItem(Object obj, boolean z, FlatMapPublisherSubscriber<T, R> flatMapPublisherSubscriber) {
            if (flatMapPublisherSubscriber.hasSignalsQueued() || (z && !tryDecrementPendingDemand())) {
                flatMapPublisherSubscriber.markSignalsQueued();
                enqueueAndDrain(obj);
                return;
            }
            if (obj == MAPPED_SOURCE_COMPLETE) {
                try {
                    requestMoreFromUpstream(1);
                    return;
                } catch (Throwable th) {
                    onErrorNotHoldingLock(th);
                    return;
                }
            }
            if (!ConcurrentUtils.tryAcquireLock(emittingLockUpdater, this)) {
                if (z) {
                    pendingDemandUpdater.getAndAccumulate(this, 1L, FlowControlUtils::addWithOverflowProtectionIfNotNegative);
                }
                flatMapPublisherSubscriber.markSignalsQueued();
                enqueueAndDrain(obj);
                return;
            }
            try {
                boolean sendToTarget = sendToTarget(obj);
                if (!$assertionsDisabled && sendToTarget != z && !this.targetTerminated) {
                    throw new AssertionError();
                }
                if (ConcurrentUtils.releaseLock(emittingLockUpdater, this)) {
                    return;
                }
                drainPending();
            } catch (Throwable th2) {
                if (!ConcurrentUtils.releaseLock(emittingLockUpdater, this)) {
                    drainPending();
                }
                throw th2;
            }
        }

        private void onErrorHoldingLock(Throwable th) {
            try {
                doCancel(true);
            } finally {
                sendToTarget(TerminalNotification.error(th));
            }
        }

        private void onErrorNotHoldingLock(Throwable th) {
            if (ConcurrentUtils.tryAcquireLock(emittingLockUpdater, this)) {
                onErrorHoldingLock(th);
                return;
            }
            try {
                doCancel(true);
            } finally {
                enqueueAndDrain(TerminalNotification.error(th));
            }
        }

        private void enqueueItem(Object obj) {
            if (this.signals.offer(obj)) {
                return;
            }
            enqueueFailed(obj);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void enqueueAndDrain(Object obj) {
            enqueueItem(obj);
            drainPending();
        }

        private static void enqueueFailed(Object obj) {
            PublisherFlatMapMerge.LOGGER.error("Queue should be unbounded, but an offer failed for item {}!", obj);
            throw new QueueFullException("pending");
        }

        private void drainPending() {
            Object peek;
            Object poll;
            boolean z = true;
            while (z && ConcurrentUtils.tryAcquireLock(emittingLockUpdater, this)) {
                try {
                    int i = 0;
                    long andSet = pendingDemandUpdater.getAndSet(this, 0L);
                    if (andSet < 0) {
                        this.pendingDemand = andSet;
                    } else {
                        long j = 0;
                        while (j < andSet && (poll = this.signals.poll()) != null) {
                            if (poll == MAPPED_SOURCE_COMPLETE) {
                                i++;
                            } else if (sendToTarget(poll)) {
                                j++;
                            }
                        }
                        if (j == andSet) {
                            while (true) {
                                peek = this.signals.peek();
                                if (peek != MAPPED_SOURCE_COMPLETE) {
                                    if (!(peek instanceof FlatMapPublisherSubscriber)) {
                                        break;
                                    }
                                    this.signals.poll();
                                    distributeMappedDemand((FlatMapPublisherSubscriber) peek);
                                } else {
                                    this.signals.poll();
                                    i++;
                                }
                            }
                            if (peek instanceof TerminalNotification) {
                                sendToTarget(peek);
                            } else {
                                sendToTargetIfPrematureError();
                            }
                        } else {
                            if (!$assertionsDisabled && j >= andSet) {
                                throw new AssertionError();
                            }
                            pendingDemandUpdater.accumulateAndGet(this, andSet - j, FlowControlUtils::addWithOverflowProtectionIfNotNegative);
                        }
                    }
                    if (i != 0) {
                        requestMoreFromUpstream(i);
                    }
                    z = !ConcurrentUtils.releaseLock(emittingLockUpdater, this);
                } catch (Throwable th) {
                    onErrorHoldingLock(th);
                    return;
                }
            }
        }

        private void requestMoreFromUpstream(int i) {
            if (!$assertionsDisabled && i <= 0) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && this.subscription == null) {
                throw new AssertionError();
            }
            this.subscription.request(i);
        }

        private boolean sendToTarget(Object obj) {
            if (!$assertionsDisabled && obj == MAPPED_SOURCE_COMPLETE) {
                throw new AssertionError();
            }
            if (this.targetTerminated) {
                return false;
            }
            if (!(obj instanceof TerminalNotification)) {
                if (obj instanceof FlatMapPublisherSubscriber) {
                    distributeMappedDemand((FlatMapPublisherSubscriber) obj);
                    return false;
                }
                this.target.onNext((Object) SubscriberApiUtils.unwrapNullUnchecked(obj));
                return true;
            }
            this.signals.clear();
            this.targetTerminated = true;
            Throwable th = this.pendingError;
            if (th != null) {
                this.target.onError(th);
                return false;
            }
            ((TerminalNotification) obj).terminate(this.target);
            return false;
        }

        private void sendToTargetIfPrematureError() {
            if (this.upstreamError == null || this.targetTerminated) {
                return;
            }
            this.signals.clear();
            this.targetTerminated = true;
            this.target.onError(this.upstreamError);
        }

        private boolean terminateActiveMappedSources() {
            int i;
            do {
                i = this.activeMappedSources;
                if (!$assertionsDisabled && i < 0) {
                    throw new AssertionError();
                }
            } while (!activeMappedSourcesUpdater.compareAndSet(this, i, -i));
            return i == 0;
        }

        private boolean decrementActiveMappedSources() {
            while (true) {
                int i = this.activeMappedSources;
                if (!$assertionsDisabled && i == 0) {
                    throw new AssertionError();
                }
                if (i > 0) {
                    if (activeMappedSourcesUpdater.compareAndSet(this, i, i - 1)) {
                        return false;
                    }
                } else if (activeMappedSourcesUpdater.compareAndSet(this, i, i + 1)) {
                    return i == -1;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean removeSubscriber(FlatMapPublisherSubscriber<T, R> flatMapPublisherSubscriber, int i) {
            if (this.cancellableSet.remove(flatMapPublisherSubscriber) && decrementActiveMappedSources()) {
                return true;
            }
            if (i <= 0) {
                return false;
            }
            incMappedDemand(i);
            return false;
        }

        static {
            $assertionsDisabled = !PublisherFlatMapMerge.class.desiredAssertionStatus();
            MAPPED_SOURCE_COMPLETE = new Object();
            pendingErrorUpdater = AtomicReferenceFieldUpdater.newUpdater(FlatMapSubscriber.class, Throwable.class, "pendingError");
            pendingErrorCountUpdater = AtomicIntegerFieldUpdater.newUpdater(FlatMapSubscriber.class, "pendingErrorCount");
            emittingLockUpdater = AtomicIntegerFieldUpdater.newUpdater(FlatMapSubscriber.class, "emittingLock");
            mappedDemandUpdater = AtomicLongFieldUpdater.newUpdater(FlatMapSubscriber.class, "mappedDemand");
            pendingDemandUpdater = AtomicLongFieldUpdater.newUpdater(FlatMapSubscriber.class, "pendingDemand");
            activeMappedSourcesUpdater = AtomicIntegerFieldUpdater.newUpdater(FlatMapSubscriber.class, "activeMappedSources");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PublisherFlatMapMerge(Publisher<T> publisher, Function<? super T, ? extends Publisher<? extends R>> function, boolean z) {
        this(publisher, function, z, 16);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PublisherFlatMapMerge(Publisher<T> publisher, Function<? super T, ? extends Publisher<? extends R>> function, boolean z, int i) {
        this(publisher, function, CompositeExceptionUtils.maxDelayedErrors(z), i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PublisherFlatMapMerge(Publisher<T> publisher, Function<? super T, ? extends Publisher<? extends R>> function, int i, int i2) {
        super(publisher);
        if (i2 <= 0) {
            throw new IllegalArgumentException("maxConcurrency: " + i2 + " (expected >0)");
        }
        if (i < 0) {
            throw new IllegalArgumentException("maxDelayedErrors: " + i + " (expected >=0)");
        }
        this.mapper = (Function) Objects.requireNonNull(function);
        this.maxConcurrency = i2;
        this.maxDelayedErrors = i;
    }

    @Override // io.servicetalk.concurrent.api.PublisherOperator, java.util.function.Function
    public PublisherSource.Subscriber<? super T> apply(PublisherSource.Subscriber<? super R> subscriber) {
        return new FlatMapSubscriber(this, subscriber);
    }
}
