package org.apache.bookkeeper.bookie.datainteg;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.disposables.Disposable;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.17.1.jar:org/apache/bookkeeper/bookie/datainteg/MetadataAsyncIterator.class */
public class MetadataAsyncIterator {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MetadataAsyncIterator.class);
    private final Scheduler scheduler;
    private final LedgerManager ledgerManager;
    private final long zkTimeoutMs;
    private final int maxInFlight;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.17.1.jar:org/apache/bookkeeper/bookie/datainteg/MetadataAsyncIterator$FlatIterator.class */
    public static class FlatIterator {
        final LedgerManager.LedgerRangeIterator ranges;
        Iterator<Long> range = null;

        FlatIterator(LedgerManager.LedgerRangeIterator ledgerRangeIterator) {
            this.ranges = ledgerRangeIterator;
        }

        boolean hasNext() throws IOException {
            if ((this.range == null || !this.range.hasNext()) && this.ranges.hasNext()) {
                this.range = this.ranges.next().getLedgers().iterator();
            }
            return this.range != null && this.range.hasNext();
        }

        Long next() throws IOException {
            return this.range.next();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MetadataAsyncIterator(Scheduler scheduler, LedgerManager ledgerManager, int i, int i2, TimeUnit timeUnit) {
        this.scheduler = scheduler;
        this.ledgerManager = ledgerManager;
        this.maxInFlight = i;
        this.zkTimeoutMs = timeUnit.toMillis(i2);
    }

    public CompletableFuture<Void> forEach(BiFunction<Long, LedgerMetadata, CompletableFuture<Void>> biFunction) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        Disposable subscribe = Flowable.generate(() -> {
            return new FlatIterator(this.ledgerManager.getLedgerRanges(this.zkTimeoutMs));
        }, (flatIterator, emitter) -> {
            try {
                if (flatIterator.hasNext()) {
                    emitter.onNext(flatIterator.next());
                } else {
                    emitter.onComplete();
                }
            } catch (Exception e) {
                emitter.onError(e);
            }
        }).subscribeOn(this.scheduler).flatMapCompletable(l -> {
            return Completable.fromCompletionStage(processOne(l.longValue(), biFunction));
        }, false, this.maxInFlight).subscribe(() -> {
            completableFuture.complete(null);
        }, th -> {
            completableFuture.completeExceptionally(unwrap(th));
        });
        completableFuture.whenComplete((r3, th2) -> {
            subscribe.dispose();
        });
        return completableFuture;
    }

    private CompletableFuture<Void> processOne(long j, BiFunction<Long, LedgerMetadata, CompletableFuture<Void>> biFunction) {
        return this.ledgerManager.readLedgerMetadata(j).thenApply((v0) -> {
            return v0.getValue();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) ledgerMetadata -> {
            return (CompletableFuture) biFunction.apply(Long.valueOf(j), ledgerMetadata);
        }).exceptionally(th -> {
            Throwable unwrap = unwrap(th);
            log.warn("Got exception processing ledger {}", Long.valueOf(j), unwrap);
            if (unwrap instanceof BKException.BKNoSuchLedgerExistsOnMetadataServerException) {
                return null;
            }
            throw new CompletionException(unwrap);
        });
    }

    static Throwable unwrap(Throwable th) {
        return ((th instanceof CompletionException) || (th instanceof ExecutionException)) ? unwrap(th.getCause()) : th;
    }
}
