package io.georocket.storage;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.Producer;
import rx.Subscriber;
import rx.internal.operators.BackpressureUtils;

/* loaded from: input_file:io/georocket/storage/RxAsyncCursor.class */
public class RxAsyncCursor<T> implements AsyncCursor<T> {
    private final AsyncCursor<T> delegate;

    public RxAsyncCursor(AsyncCursor<T> asyncCursor) {
        this.delegate = asyncCursor;
    }

    public AsyncCursor<T> getDelegate() {
        return this.delegate;
    }

    public boolean hasNext() {
        return this.delegate.hasNext();
    }

    public void next(Handler<AsyncResult<T>> handler) {
        this.delegate.next(handler);
    }

    public Observable<T> toObservable() {
        return Observable.unsafeCreate(subscriber -> {
            subscriber.setProducer(new Producer() { // from class: io.georocket.storage.RxAsyncCursor.1
                private AtomicLong requested = new AtomicLong();

                public void request(long j) {
                    if (j <= 0 || subscriber.isUnsubscribed() || BackpressureUtils.getAndAddRequest(this.requested, j) != 0) {
                        return;
                    }
                    drain();
                }

                private void drain() {
                    if (this.requested.get() > 0) {
                        if (RxAsyncCursor.this.hasNext()) {
                            RxAsyncCursor rxAsyncCursor = RxAsyncCursor.this;
                            Subscriber subscriber = subscriber;
                            rxAsyncCursor.next(asyncResult -> {
                                if (subscriber.isUnsubscribed()) {
                                    return;
                                }
                                if (asyncResult.failed()) {
                                    subscriber.onError(asyncResult.cause());
                                    return;
                                }
                                subscriber.onNext(asyncResult.result());
                                this.requested.decrementAndGet();
                                drain();
                            });
                        } else {
                            if (subscriber.isUnsubscribed()) {
                                return;
                            }
                            subscriber.onCompleted();
                        }
                    }
                }
            });
        });
    }
}
