package dev.vortex.spark.read;

import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.ToLongFunction;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:dev/vortex/spark/read/PrefetchingIterator.class */
public final class PrefetchingIterator<T> implements Iterator<T>, AutoCloseable {
    private static final Object CONDITION = new Object();
    private final Iterator<T> delegate;
    private final long maxBufferSize;
    private final ToLongFunction<T> sizeFunc;
    private final BlockingQueue<T> fetched = new LinkedBlockingQueue();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicLong bufferBytes = new AtomicLong(0);
    private final Thread producerThread = new Thread(this::prefetchLoop, "vortex-prefetch-thread");

    /* JADX INFO: Access modifiers changed from: package-private */
    public PrefetchingIterator(Iterator<T> it, long j, ToLongFunction<T> toLongFunction) {
        this.delegate = it;
        this.maxBufferSize = j;
        this.sizeFunc = toLongFunction;
        this.producerThread.setDaemon(true);
        this.producerThread.start();
    }

    private void prefetchLoop() {
        while (!this.closed.get() && this.delegate.hasNext()) {
            try {
                try {
                    try {
                        while (this.bufferBytes.get() > this.maxBufferSize) {
                            synchronized (CONDITION) {
                                CONDITION.wait();
                            }
                        }
                        T next = this.delegate.next();
                        this.bufferBytes.addAndGet(this.sizeFunc.applyAsLong(next));
                        this.fetched.put(next);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException("Prefetching interrupted", e);
                    }
                } catch (Exception e2) {
                    throw new RuntimeException("Prefetching failed", e2);
                }
            } finally {
                this.closed.set(true);
            }
        }
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        while (!this.closed.get()) {
            if (!this.fetched.isEmpty()) {
                return true;
            }
        }
        return !this.fetched.isEmpty();
    }

    @Override // java.util.Iterator
    public T next() {
        try {
            T take = this.fetched.take();
            this.bufferBytes.addAndGet(-this.sizeFunc.applyAsLong(take));
            synchronized (CONDITION) {
                CONDITION.notify();
            }
            return take;
        } catch (InterruptedException e) {
            throw new RuntimeException("Prefetch queue take interrupted", e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.closed.set(true);
        this.producerThread.interrupt();
    }
}
