package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.0.0-RC2.jar:io/rsocket/resume/UpstreamFramesSubscriber.class */
public class UpstreamFramesSubscriber implements Subscriber<ByteBuf>, Disposable {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) UpstreamFramesSubscriber.class);
    private final AtomicBoolean disposed = new AtomicBoolean();
    private final Consumer<ByteBuf> itemConsumer;
    private final Disposable downstreamRequestDisposable;
    private final Disposable resumeSaveStreamDisposable;
    private volatile Subscription subs;
    private volatile boolean resumeStarted;
    private final Queue<ByteBuf> framesCache;
    private long request;
    private long downStreamRequestN;
    private long resumeSaveStreamRequestN;

    /* JADX INFO: Access modifiers changed from: package-private */
    public UpstreamFramesSubscriber(int i, Flux<Long> flux, Flux<Long> flux2, Consumer<ByteBuf> consumer) {
        this.itemConsumer = consumer;
        this.framesCache = (Queue) Queues.unbounded(i).get();
        this.downstreamRequestDisposable = flux.subscribe(l -> {
            requestN(0L, l.longValue());
        });
        this.resumeSaveStreamDisposable = flux2.subscribe(l2 -> {
            requestN(l2.longValue(), 0L);
        });
    }

    @Override // org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        this.subs = subscription;
        if (isDisposed()) {
            subscription.cancel();
        } else {
            doRequest();
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(ByteBuf byteBuf) {
        processFrame(byteBuf);
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        dispose();
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        dispose();
    }

    public void resumeStart() {
        this.resumeStarted = true;
    }

    public void resumeComplete() {
        ByteBuf poll = this.framesCache.poll();
        while (true) {
            ByteBuf byteBuf = poll;
            if (byteBuf == null) {
                this.resumeStarted = false;
                doRequest();
                return;
            } else {
                this.itemConsumer.accept(byteBuf);
                poll = this.framesCache.poll();
            }
        }
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        if (this.disposed.compareAndSet(false, true)) {
            releaseCache();
            if (this.subs != null) {
                this.subs.cancel();
            }
            this.resumeSaveStreamDisposable.dispose();
            this.downstreamRequestDisposable.dispose();
        }
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.disposed.get();
    }

    private void requestN(long j, long j2) {
        synchronized (this) {
            this.downStreamRequestN = Operators.addCap(this.downStreamRequestN, j2);
            this.resumeSaveStreamRequestN = Operators.addCap(this.resumeSaveStreamRequestN, j);
            long min = Math.min(this.downStreamRequestN, this.resumeSaveStreamRequestN);
            if (min > 0) {
                this.downStreamRequestN -= min;
                this.resumeSaveStreamRequestN -= min;
                logger.debug("Upstream subscriber requestN: {}", Long.valueOf(min));
                this.request = Operators.addCap(this.request, min);
            }
        }
        doRequest();
    }

    private void doRequest() {
        if (this.subs == null || this.resumeStarted) {
            return;
        }
        synchronized (this) {
            long j = this.request;
            if (j > 0) {
                this.subs.request(j);
                this.request = 0L;
            }
        }
    }

    private void releaseCache() {
        ByteBuf poll = this.framesCache.poll();
        while (poll != null && poll.refCnt() > 0) {
            poll.release();
        }
    }

    private void processFrame(ByteBuf byteBuf) {
        if (this.resumeStarted) {
            this.framesCache.offer(byteBuf);
        } else {
            this.itemConsumer.accept(byteBuf);
        }
    }
}
