package software.amazon.smithy.java.core.serde;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

/* loaded from: input_file:software/amazon/smithy/java/core/serde/BufferingFlatMapProcessor.class */
public abstract class BufferingFlatMapProcessor<I, O> implements Flow.Processor<I, O>, Flow.Subscription {
    private static final Throwable COMPLETE_SENTINEL = new RuntimeException();
    private final Flow.Publisher<I> publisher;
    private volatile Flow.Subscription upstreamSubscription;
    private volatile Flow.Subscriber<? super O> downstream;
    private final AtomicReference<Throwable> terminalEvent = new AtomicReference<>();
    private final AtomicLong pendingRequests = new AtomicLong();
    private final AtomicInteger pendingFlushes = new AtomicInteger();
    private final BlockingQueue<O> queue = new LinkedBlockingQueue();
    private boolean terminated = false;

    public BufferingFlatMapProcessor(Flow.Publisher<I> publisher) {
        this.publisher = publisher;
        publisher.subscribe(this);
    }

    protected abstract Stream<O> map(I i);

    @Override // java.util.concurrent.Flow.Subscriber
    public final void onSubscribe(Flow.Subscription subscription) {
        this.upstreamSubscription = subscription;
        if (this.pendingRequests.get() <= 0 || this.pendingFlushes.get() != 0) {
            return;
        }
        flush();
    }

    @Override // java.util.concurrent.Flow.Publisher
    public final void subscribe(Flow.Subscriber<? super O> subscriber) {
        this.downstream = subscriber;
        subscriber.onSubscribe(this);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public final void onNext(I i) {
        try {
            Stream<O> map = map(i);
            BlockingQueue<O> blockingQueue = this.queue;
            Objects.requireNonNull(blockingQueue);
            map.forEach(blockingQueue::add);
            flush();
        } catch (Exception e) {
            onError(new SerializationException("Malformed input", e));
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public final void onError(Throwable th) {
        this.upstreamSubscription.cancel();
        this.terminalEvent.compareAndSet(null, th);
        flush();
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public final void onComplete() {
        this.terminalEvent.compareAndSet(null, COMPLETE_SENTINEL);
        flush();
    }

    @Override // java.util.concurrent.Flow.Subscription
    public final void request(long j) {
        if (j <= 0) {
            onError(new IllegalArgumentException("got a request for " + j + " items"));
        } else {
            accumulate(this.pendingRequests, j);
            flush();
        }
    }

    private void flush() {
        if (this.upstreamSubscription == null || this.downstream == null) {
            onError(new IllegalStateException("flush() requested before upstream and downstream fully wired."));
            return;
        }
        if (this.pendingFlushes.getAndIncrement() > 0 || this.terminated) {
            return;
        }
        int i = 1;
        while (true) {
            int i2 = i;
            if (i2 <= 0) {
                return;
            }
            long j = this.pendingRequests.get();
            Flow.Subscriber<? super O> subscriber = this.downstream;
            long sendMessages = sendMessages(subscriber, j);
            boolean isEmpty = this.queue.isEmpty();
            Throwable th = this.terminalEvent.get();
            if (th != null && attemptTermination(subscriber, th, isEmpty)) {
                this.terminated = true;
                return;
            }
            if (sendMessages > 0) {
                accumulate(this.pendingRequests, -sendMessages);
                j = accumulate(j, -sendMessages);
            }
            if (j > 0) {
                this.upstreamSubscription.request(1L);
            }
            i = this.pendingFlushes.addAndGet(-i2);
        }
    }

    protected void handleError(Throwable th, Flow.Subscriber<? super O> subscriber) {
        subscriber.onError(th);
    }

    private boolean attemptTermination(Flow.Subscriber<? super O> subscriber, Throwable th, boolean z) {
        if (!z || subscriber == null) {
            return false;
        }
        if (th == COMPLETE_SENTINEL) {
            subscriber.onComplete();
            return true;
        }
        handleError(th, subscriber);
        return true;
    }

    private long sendMessages(Flow.Subscriber<? super O> subscriber, long j) {
        O poll;
        long j2 = 0;
        if (subscriber != null) {
            while (j2 < j && (poll = this.queue.poll()) != null) {
                j2++;
                subscriber.onNext(poll);
            }
        }
        return j2;
    }

    @Override // java.util.concurrent.Flow.Subscription
    public final void cancel() {
        this.upstreamSubscription.cancel();
    }

    private static long accumulate(long j, long j2) {
        if (j == Long.MAX_VALUE || j2 == Long.MAX_VALUE) {
            return Long.MAX_VALUE;
        }
        try {
            return Math.addExact(j, j2);
        } catch (ArithmeticException e) {
            return Long.MAX_VALUE;
        }
    }

    private static long accumulate(AtomicLong atomicLong, long j) {
        return atomicLong.accumulateAndGet(j, BufferingFlatMapProcessor::accumulate);
    }
}
