package org.jtrim2.stream;

import java.util.Objects;
import org.jtrim2.cancel.CancellationToken;
import org.jtrim2.utils.ExceptionHelper;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/jtrim2/stream/AsyncSourceProducer.class */
public final class AsyncSourceProducer<T> implements SeqProducer<T> {
    private final PollableElementSource<? extends T> source;

    public AsyncSourceProducer(PollableElementSource<? extends T> pollableElementSource) {
        this.source = (PollableElementSource) Objects.requireNonNull(pollableElementSource, "source");
    }

    @Override // org.jtrim2.stream.SeqProducer
    public void transferAll(CancellationToken cancellationToken, ElementConsumer<? super T> elementConsumer) throws Exception {
        Objects.requireNonNull(cancellationToken, "cancelToken");
        Objects.requireNonNull(elementConsumer, "consumer");
        Throwable th = null;
        while (true) {
            try {
                T next = this.source.getNext(cancellationToken);
                if (next == null) {
                    break;
                } else {
                    elementConsumer.processElement(next);
                }
            } catch (Throwable th2) {
                th = th2;
            }
        }
        this.source.finish(th);
        ExceptionHelper.rethrowCheckedIfNotNull(th, Exception.class);
    }
}
