package org.apache.bookkeeper.bookie.datainteg;

import com.google.common.collect.ImmutableSortedMap;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.disposables.Disposable;
import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.datainteg.EntryCopier;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.net.BookieId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.16.3.jar:org/apache/bookkeeper/bookie/datainteg/DataIntegrityCheckImpl.class */
public class DataIntegrityCheckImpl implements DataIntegrityCheck {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DataIntegrityCheckImpl.class);
    private static final int MAX_INFLIGHT = 300;
    private static final int MAX_ENTRIES_INFLIGHT = 3000;
    private static final int ZK_TIMEOUT_S = 30;
    private final BookieId bookieId;
    private final LedgerManager ledgerManager;
    private final LedgerStorage ledgerStorage;
    private final EntryCopier entryCopier;
    private final BookKeeperAdmin admin;
    private final Scheduler scheduler;
    private final AtomicReference<Map<Long, LedgerMetadata>> ledgersCacheRef = new AtomicReference<>(null);
    private CompletableFuture<Void> preBootFuture;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.16.3.jar:org/apache/bookkeeper/bookie/datainteg/DataIntegrityCheckImpl$LedgerResult.class */
    public static class LedgerResult {
        private final State state;
        private final long ledgerId;
        private final LedgerMetadata metadata;
        private final Throwable throwable;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.16.3.jar:org/apache/bookkeeper/bookie/datainteg/DataIntegrityCheckImpl$LedgerResult$State.class */
        public enum State {
            MISSING,
            ERROR,
            OK
        }

        static LedgerResult missing(long j) {
            return new LedgerResult(State.MISSING, j, null, null);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static LedgerResult ok(long j, LedgerMetadata ledgerMetadata) {
            return new LedgerResult(State.OK, j, ledgerMetadata, null);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static LedgerResult error(long j, LedgerMetadata ledgerMetadata, Throwable th) {
            return new LedgerResult(State.ERROR, j, ledgerMetadata, th);
        }

        private LedgerResult(State state, long j, LedgerMetadata ledgerMetadata, Throwable th) {
            this.state = state;
            this.ledgerId = j;
            this.metadata = ledgerMetadata;
            this.throwable = th;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isMissing() {
            return this.state == State.MISSING;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isOK() {
            return this.state == State.OK;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isError() {
            return this.state == State.ERROR;
        }

        long getLedgerId() {
            return this.ledgerId;
        }

        LedgerMetadata getMetadata() {
            return this.metadata;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Throwable getThrowable() {
            return this.throwable;
        }
    }

    public DataIntegrityCheckImpl(BookieId bookieId, LedgerManager ledgerManager, LedgerStorage ledgerStorage, EntryCopier entryCopier, BookKeeperAdmin bookKeeperAdmin, Scheduler scheduler) {
        this.bookieId = bookieId;
        this.ledgerManager = ledgerManager;
        this.ledgerStorage = ledgerStorage;
        this.entryCopier = entryCopier;
        this.admin = bookKeeperAdmin;
        this.scheduler = scheduler;
    }

    @Override // org.apache.bookkeeper.bookie.datainteg.DataIntegrityCheck
    public synchronized CompletableFuture<Void> runPreBootCheck(String str) {
        if (this.preBootFuture == null) {
            this.preBootFuture = runPreBootSequence(str);
        }
        return this.preBootFuture;
    }

    private CompletableFuture<Void> runPreBootSequence(String str) {
        String uuid = UUID.randomUUID().toString();
        log.info("Event: {}, RunId: {}, Reason: {}", Events.PREBOOT_START, uuid, str);
        try {
            this.ledgerStorage.setStorageStateFlag(LedgerStorage.StorageState.NEEDS_INTEGRITY_CHECK);
            MetadataAsyncIterator metadataAsyncIterator = new MetadataAsyncIterator(this.scheduler, this.ledgerManager, 300, 30, TimeUnit.SECONDS);
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap(Comparator.naturalOrder().reversed());
            metadataAsyncIterator.forEach((l, ledgerMetadata) -> {
                if (ensemblesContainBookie(ledgerMetadata, this.bookieId)) {
                    concurrentSkipListMap.put(l, ledgerMetadata);
                    try {
                        if (!this.ledgerStorage.ledgerExists(l.longValue())) {
                            this.ledgerStorage.setMasterKey(l.longValue(), new byte[0]);
                        }
                    } catch (IOException e) {
                        log.error("Event: {}, RunId: {}, LedgerId: {}", Events.ENSURE_LEDGER_ERROR, uuid, l, e);
                        return FutureUtils.exception(e);
                    }
                }
                return processPreBoot(l.longValue(), ledgerMetadata, uuid);
            }).whenComplete((r11, th) -> {
                if (th != null) {
                    log.error("Event: {}, runId: {}", Events.PREBOOT_ERROR, uuid, th);
                    completableFuture.completeExceptionally(th);
                    return;
                }
                try {
                    this.ledgerStorage.flush();
                    updateMetadataCache(concurrentSkipListMap);
                    log.info("Event: {}, runId: {}, processed: {}", Events.PREBOOT_END, uuid, Integer.valueOf(concurrentSkipListMap.size()));
                    completableFuture.complete(null);
                } catch (Throwable th) {
                    log.error("Event: {}, runId: {}", Events.PREBOOT_ERROR, uuid, th);
                    completableFuture.completeExceptionally(th);
                }
            });
            return completableFuture;
        } catch (IOException e) {
            log.error("Event: {}, RunId: {}", Events.PREBOOT_ERROR, uuid, e);
            return FutureUtils.exception(e);
        }
    }

    @Override // org.apache.bookkeeper.bookie.datainteg.DataIntegrityCheck
    public boolean needsFullCheck() throws IOException {
        return this.ledgerStorage.getStorageStateFlags().contains(LedgerStorage.StorageState.NEEDS_INTEGRITY_CHECK);
    }

    @Override // org.apache.bookkeeper.bookie.datainteg.DataIntegrityCheck
    public CompletableFuture<Void> runFullCheck() {
        String uuid = UUID.randomUUID().toString();
        log.info("Event: {}, runId: {}", Events.FULL_CHECK_INIT, uuid);
        return getCachedOrReadMetadata(uuid).thenCompose(map -> {
            log.info("Event: {}, runId: {}, ledgerCount: {}", Events.FULL_CHECK_START, uuid, Integer.valueOf(map.size()));
            return checkAndRecoverLedgers(map, uuid).thenApply(set -> {
                Iterator it = set.iterator();
                while (it.hasNext()) {
                    LedgerResult ledgerResult = (LedgerResult) it.next();
                    if (ledgerResult.isMissing() || ledgerResult.isOK()) {
                        map.remove(Long.valueOf(ledgerResult.getLedgerId()));
                    } else if (ledgerResult.isError()) {
                        map.put(Long.valueOf(ledgerResult.getLedgerId()), ledgerResult.getMetadata());
                    }
                }
                Optional findFirst = set.stream().filter(ledgerResult2 -> {
                    return ledgerResult2.isError();
                }).map(ledgerResult3 -> {
                    return ledgerResult3.getThrowable();
                }).findFirst();
                if (findFirst.isPresent()) {
                    log.error("Event: {}, runId: {}, ok: {}, error: {}, missing: {}, ledgersToRetry: {}", Events.FULL_CHECK_END, uuid, Long.valueOf(set.stream().filter(ledgerResult4 -> {
                        return ledgerResult4.isOK();
                    }).count()), Long.valueOf(set.stream().filter(ledgerResult5 -> {
                        return ledgerResult5.isError();
                    }).count()), Long.valueOf(set.stream().filter(ledgerResult6 -> {
                        return ledgerResult6.isMissing();
                    }).count()), Integer.valueOf(map.size()), findFirst.get());
                } else {
                    log.info("Event: {}, runId: {}, ok: {}, error: 0, missing: {}, ledgersToRetry: {}", Events.FULL_CHECK_END, uuid, Long.valueOf(set.stream().filter(ledgerResult7 -> {
                        return ledgerResult7.isOK();
                    }).count()), Long.valueOf(set.stream().filter(ledgerResult8 -> {
                        return ledgerResult8.isMissing();
                    }).count()), Integer.valueOf(map.size()));
                }
                return map;
            });
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) map2 -> {
            CompletableFuture completableFuture = new CompletableFuture();
            try {
                this.ledgerStorage.flush();
                if (map2.isEmpty()) {
                    log.info("Event: {}, runId: {}", Events.CLEAR_INTEGCHECK_FLAG, uuid);
                    this.ledgerStorage.clearStorageStateFlag(LedgerStorage.StorageState.NEEDS_INTEGRITY_CHECK);
                }
                updateMetadataCache(map2);
                log.info("Event: {}, runId: {}", Events.FULL_CHECK_COMPLETE, uuid);
                completableFuture.complete(null);
            } catch (IOException e) {
                log.error("Event: {}, runId: {}", Events.FULL_CHECK_ERROR, uuid, e);
                completableFuture.completeExceptionally(e);
            }
            return completableFuture;
        });
    }

    void updateMetadataCache(Map<Long, LedgerMetadata> map) {
        this.ledgersCacheRef.set(map);
    }

    CompletableFuture<Map<Long, LedgerMetadata>> getCachedOrReadMetadata(String str) {
        Map<Long, LedgerMetadata> map = this.ledgersCacheRef.get();
        if (map != null) {
            log.info("Event: {}, runId: {}, ledgerCount: {}", Events.USE_CACHED_METADATA, str, Integer.valueOf(map.size()));
            return CompletableFuture.completedFuture(map);
        }
        log.info("Event: {}, runId: {}", Events.REFRESH_METADATA, str);
        MetadataAsyncIterator metadataAsyncIterator = new MetadataAsyncIterator(this.scheduler, this.ledgerManager, 300, 30, TimeUnit.SECONDS);
        ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap(Comparator.naturalOrder().reversed());
        return metadataAsyncIterator.forEach((l, ledgerMetadata) -> {
            if (ensemblesContainBookie(ledgerMetadata, this.bookieId)) {
                concurrentSkipListMap.put(l, ledgerMetadata);
            }
            return CompletableFuture.completedFuture(null);
        }).thenApply(r5 -> {
            updateMetadataCache(concurrentSkipListMap);
            return concurrentSkipListMap;
        });
    }

    private CompletableFuture<Void> processPreBoot(long j, LedgerMetadata ledgerMetadata, String str) {
        Map.Entry<Long, ? extends List<BookieId>> lastEntry = ledgerMetadata.getAllEnsembles().lastEntry();
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (lastEntry == null) {
            log.error("Event: {}, runId: {}, metadata: {}, ledger: {}", Events.INVALID_METADATA, str, ledgerMetadata, Long.valueOf(j));
            completableFuture.completeExceptionally(new IllegalStateException(String.format("All metadata must have at least one ensemble, %d does not", Long.valueOf(j))));
            return completableFuture;
        }
        if (ledgerMetadata.isClosed() || !lastEntry.getValue().contains(this.bookieId)) {
            completableFuture.complete(null);
        } else {
            try {
                log.info("Event: {}, runId: {}, metadata: {}, ledger: {}", Events.MARK_LIMBO, str, ledgerMetadata, Long.valueOf(j));
                this.ledgerStorage.setLimboState(j);
                this.ledgerStorage.setFenced(j);
                completableFuture.complete(null);
            } catch (IOException e) {
                log.info("Event: {}, runId: {}, metadata: {}, ledger: {}", Events.LIMBO_OR_FENCE_ERROR, str, ledgerMetadata, Long.valueOf(j), e);
                completableFuture.completeExceptionally(e);
            }
        }
        return completableFuture;
    }

    CompletableFuture<Set<LedgerResult>> checkAndRecoverLedgers(Map<Long, LedgerMetadata> map, String str) {
        CompletableFuture<Set<LedgerResult>> completableFuture = new CompletableFuture<>();
        Disposable subscribe = Flowable.fromIterable(map.entrySet()).subscribeOn(this.scheduler, false).flatMapSingle(entry -> {
            long longValue = ((Long) entry.getKey()).longValue();
            LedgerMetadata ledgerMetadata = (LedgerMetadata) entry.getValue();
            return recoverLedgerIfInLimbo(longValue, (LedgerMetadata) entry.getValue(), str).map(ledgerMetadata2 -> {
                return LedgerResult.ok(longValue, ledgerMetadata2);
            }).onErrorReturn(th -> {
                return LedgerResult.error(longValue, ledgerMetadata, th);
            }).defaultIfEmpty(LedgerResult.missing(longValue)).flatMap(ledgerResult -> {
                try {
                    if (ledgerResult.isOK()) {
                        this.ledgerStorage.clearLimboState(longValue);
                    }
                    return Single.just(ledgerResult);
                } catch (IOException e) {
                    return Single.just(LedgerResult.error(ledgerResult.getLedgerId(), ledgerResult.getMetadata(), e));
                }
            });
        }, true, 300).flatMapSingle(ledgerResult -> {
            return ledgerResult.isOK() ? checkAndRecoverLedgerEntries(ledgerResult.getLedgerId(), ledgerResult.getMetadata(), str).map(l -> {
                return LedgerResult.ok(ledgerResult.getLedgerId(), ledgerResult.getMetadata());
            }).onErrorReturn(th -> {
                return LedgerResult.error(ledgerResult.getLedgerId(), ledgerResult.getMetadata(), th);
            }) : Single.just(ledgerResult);
        }, true, 1).collect(Collectors.toSet()).subscribe(set -> {
            completableFuture.complete(set);
        }, th -> {
            completableFuture.completeExceptionally(th);
        });
        completableFuture.whenComplete((set2, th2) -> {
            subscribe.dispose();
        });
        return completableFuture;
    }

    Maybe<LedgerMetadata> recoverLedgerIfInLimbo(long j, LedgerMetadata ledgerMetadata, String str) {
        try {
            if (!this.ledgerStorage.ledgerExists(j)) {
                this.ledgerStorage.setMasterKey(j, new byte[0]);
            }
            if (!this.ledgerStorage.hasLimboState(j)) {
                return Maybe.just(ledgerMetadata);
            }
            log.info("Event: {}, runId: {}, metadata: {}, ledger: {}", Events.RECOVER_LIMBO_LEDGER, str, ledgerMetadata, Long.valueOf(j));
            return recoverLedger(j, str).toMaybe().onErrorResumeNext(th -> {
                if (th instanceof BKException.BKNoSuchLedgerExistsOnMetadataServerException) {
                    log.info("Event: {}, runId: {}, metadata: {}, ledger: {}", Events.RECOVER_LIMBO_LEDGER_MISSING, str, ledgerMetadata, Long.valueOf(j));
                    return Maybe.empty();
                }
                log.info("Event: {}, runId: {}, metadata: {}, ledger: {}", Events.RECOVER_LIMBO_LEDGER_ERROR, str, ledgerMetadata, Long.valueOf(j));
                return Maybe.error(th);
            });
        } catch (IOException e) {
            return Maybe.error(e);
        }
    }

    Single<LedgerMetadata> recoverLedger(long j, String str) {
        return Single.create(singleEmitter -> {
            this.admin.asyncOpenLedger(j, (i, ledgerHandle, obj) -> {
                if (i != 0) {
                    singleEmitter.onError(BKException.create(i));
                    return;
                }
                LedgerMetadata ledgerMetadata = ledgerHandle.getLedgerMetadata();
                ledgerHandle.closeAsync().whenComplete((r11, th) -> {
                    if (th != null) {
                        log.warn("Event: {}, runId: {}, ledger: {}", Events.RECOVER_LIMBO_LEDGER_CLOSE_ERROR, str, Long.valueOf(j), th);
                    }
                });
                singleEmitter.onSuccess(ledgerMetadata);
            }, null);
        });
    }

    Single<Long> checkAndRecoverLedgerEntries(long j, LedgerMetadata ledgerMetadata, String str) {
        WriteSets writeSets = new WriteSets(ledgerMetadata.getEnsembleSize(), ledgerMetadata.getWriteQuorumSize());
        NavigableMap navigableMap = (NavigableMap) ledgerMetadata.getAllEnsembles().entrySet().stream().collect(ImmutableSortedMap.toImmutableSortedMap(Comparator.naturalOrder(), entry -> {
            return (Long) entry.getKey();
        }, entry2 -> {
            return Integer.valueOf(((List) entry2.getValue()).indexOf(this.bookieId));
        }));
        long lastEntryId = ledgerMetadata.isClosed() ? ledgerMetadata.getLastEntryId() : ledgerMetadata.getAllEnsembles().lastEntry().getKey().longValue() - 1;
        if (lastEntryId < 0) {
            return Single.just(Long.valueOf(j));
        }
        try {
            EntryCopier.Batch newBatch = this.entryCopier.newBatch(j, ledgerMetadata);
            AtomicLong atomicLong = new AtomicLong(0L);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            AtomicInteger atomicInteger2 = new AtomicInteger(0);
            AtomicReference atomicReference = new AtomicReference(null);
            log.info("Event: {}, runId: {}, metadata: {}, ledger: {}", Events.LEDGER_CHECK_AND_COPY_START, str, ledgerMetadata, Long.valueOf(j));
            return Flowable.rangeLong(0L, lastEntryId + 1).subscribeOn(this.scheduler, false).flatMapMaybe(l -> {
                return maybeCopyEntry(writeSets, navigableMap, j, l.longValue(), newBatch).doOnError(th -> {
                    atomicReference.compareAndSet(null, th);
                    atomicInteger2.incrementAndGet();
                });
            }, true, 3000).doOnNext(l2 -> {
                atomicLong.addAndGet(l2.longValue());
                atomicInteger.incrementAndGet();
            }).count().doOnTerminate(() -> {
                if (atomicReference.get() != null) {
                    log.warn("Event: {}, runId: {}, metadata: {}, ledger: {}, entries: {}, bytes: {}, errors: {}", Events.LEDGER_CHECK_AND_COPY_END, str, ledgerMetadata, Long.valueOf(j), Integer.valueOf(atomicInteger.get()), Long.valueOf(atomicLong.get()), atomicReference.get());
                } else {
                    log.info("Event: {}, runId: {}, metadata: {}, ledger: {}, entries: {}, bytes: {}, errors: 0", Events.LEDGER_CHECK_AND_COPY_END, str, ledgerMetadata, Long.valueOf(j), Integer.valueOf(atomicInteger.get()), Long.valueOf(atomicLong.get()));
                }
            }).map(l3 -> {
                return Long.valueOf(j);
            });
        } catch (IOException e) {
            return Single.error(e);
        }
    }

    Maybe<Long> maybeCopyEntry(WriteSets writeSets, NavigableMap<Long, Integer> navigableMap, long j, long j2, EntryCopier.Batch batch) {
        try {
            return isEntryMissing(writeSets, navigableMap, j, j2) ? Maybe.fromCompletionStage(batch.copyFromAvailable(j2)) : Maybe.empty();
        } catch (IOException | BookieException e) {
            return Maybe.error(e);
        }
    }

    boolean isEntryMissing(WriteSets writeSets, NavigableMap<Long, Integer> navigableMap, long j, long j2) throws IOException, BookieException {
        int intValue = navigableMap.floorEntry(Long.valueOf(j2)).getValue().intValue();
        return intValue >= 0 && writeSets.getForEntry(j2).contains(Integer.valueOf(intValue)) && !this.ledgerStorage.entryExists(j, j2);
    }

    static boolean ensemblesContainBookie(LedgerMetadata ledgerMetadata, BookieId bookieId) {
        return ledgerMetadata.getAllEnsembles().values().stream().anyMatch(list -> {
            return list.contains(bookieId);
        });
    }
}
