package org.javaync.io;

import java.nio.channels.AsynchronousFileChannel;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:org/javaync/io/AsyncFileReaderLines.class */
public class AsyncFileReaderLines extends AbstractAsyncFileReaderLines implements Subscription {
    private final Subscriber<? super String> sub;
    private final AtomicInteger onEmit = new AtomicInteger(0);
    private final ConcurrentLinkedDeque<String> lines = new ConcurrentLinkedDeque<>();
    private final AtomicLong requests = new AtomicLong();
    private boolean hasNext = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AsyncFileReaderLines(Subscriber<? super String> subscriber, AsynchronousFileChannel asynchronousFileChannel, int i) {
        this.sub = subscriber;
        readLines(asynchronousFileChannel, i);
    }

    @Override // org.javaync.io.AbstractAsyncFileReaderLines
    protected void onError(Throwable th) {
        this.sub.onError(th);
    }

    @Override // org.javaync.io.AbstractAsyncFileReaderLines
    protected void onComplete() {
        this.hasNext = false;
        tryFlushPendingLines();
    }

    @Override // org.javaync.io.AbstractAsyncFileReaderLines
    protected void onProduceLine(String str) {
        this.lines.offer(str);
        while (!isCancelled() && this.requests.get() > 0 && !this.lines.isEmpty()) {
            emitLine();
        }
    }

    private void emitLine() {
        String poll = this.lines.poll();
        if (poll == null) {
            terminateDueTo(new IllegalStateException("Unexpected race occur on lines offer. No other thread should concurrently should be taking lines!"));
        } else {
            this.sub.onNext(poll);
            this.requests.decrementAndGet();
        }
    }

    public void request(long j) {
        if (isCancelled()) {
            return;
        }
        doRequest(j);
        if (this.hasNext) {
            return;
        }
        tryFlushPendingLines();
    }

    private void tryFlushPendingLines() {
        int compareAndExchange;
        do {
            compareAndExchange = this.onEmit.compareAndExchange(0, 1);
            if (compareAndExchange <= 0) {
                while (toContinue()) {
                    emitLine();
                }
                this.onEmit.set(0);
                if (this.lines.isEmpty()) {
                    cancel();
                    this.sub.onComplete();
                    return;
                }
                return;
            }
        } while (compareAndExchange != 1);
    }

    private boolean toContinue() {
        this.onEmit.set(3);
        boolean z = (isCancelled() || this.requests.get() <= 0 || this.lines.isEmpty()) ? false : true;
        if (z) {
            this.onEmit.set(1);
        }
        return z;
    }

    private void doRequest(long j) {
        if (j < 1) {
            terminateDueTo(new IllegalArgumentException(this.sub + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
        } else if (this.requests.get() + j < 1) {
            this.requests.set(Long.MAX_VALUE);
        } else {
            this.requests.addAndGet(j);
        }
    }

    private void terminateDueTo(Throwable th) {
        cancel();
        try {
            this.sub.onError(th);
        } catch (Exception e) {
            Logger.getGlobal().log(Level.SEVERE, "Violated the Reactive Streams rule 2.13", (Throwable) new IllegalStateException(this.sub + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", e));
        }
    }
}
