package dlshade.org.apache.distributedlog.impl.logsegment;

import dlshade.com.google.common.annotations.VisibleForTesting;
import dlshade.com.google.common.collect.Lists;
import dlshade.org.apache.bookkeeper.client.AsyncCallback;
import dlshade.org.apache.bookkeeper.client.BookKeeper;
import dlshade.org.apache.bookkeeper.client.LedgerEntry;
import dlshade.org.apache.bookkeeper.client.LedgerHandle;
import dlshade.org.apache.bookkeeper.client.api.BKException;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.common.util.OrderedScheduler;
import dlshade.org.apache.bookkeeper.common.util.SafeRunnable;
import dlshade.org.apache.bookkeeper.stats.Counter;
import dlshade.org.apache.bookkeeper.stats.StatsLogger;
import dlshade.org.apache.distributedlog.DistributedLogConfiguration;
import dlshade.org.apache.distributedlog.Entry;
import dlshade.org.apache.distributedlog.LogSegmentMetadata;
import dlshade.org.apache.distributedlog.exceptions.BKTransmitException;
import dlshade.org.apache.distributedlog.exceptions.DLIllegalStateException;
import dlshade.org.apache.distributedlog.exceptions.DLInterruptedException;
import dlshade.org.apache.distributedlog.exceptions.EndOfLogSegmentException;
import dlshade.org.apache.distributedlog.exceptions.ReadCancelledException;
import dlshade.org.apache.distributedlog.injector.AsyncFailureInjector;
import dlshade.org.apache.distributedlog.logsegment.LogSegmentEntryReader;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dlshade/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.class */
public class BKLogSegmentEntryReader implements SafeRunnable, LogSegmentEntryReader, AsyncCallback.OpenCallback {
    private final BookKeeper bk;
    private final DistributedLogConfiguration conf;
    private final OrderedScheduler scheduler;
    private final long lssn;
    private final long startSequenceId;
    private final boolean envelopeEntries;
    private final boolean deserializeRecordSet;
    private final int numPrefetchEntries;
    private final int maxPrefetchEntries;
    private LogSegmentMetadata metadata;
    private LedgerHandle lh;
    private CacheEntry outstandingLongPoll;
    private long nextEntryId;
    private int readAheadWaitTime;
    private final int maxReadBackoffTime;
    private final boolean skipBrokenEntries;
    final LinkedBlockingQueue<CacheEntry> readAheadEntries;
    final LinkedList<PendingReadRequest> readQueue;
    private final AsyncFailureInjector failureInjector;
    private final Counter skippedBrokenEntriesCounter;
    private static final Logger logger = LoggerFactory.getLogger(BKLogSegmentEntryReader.class);
    private static final AtomicReferenceFieldUpdater<BKLogSegmentEntryReader, Throwable> lastExceptionUpdater = AtomicReferenceFieldUpdater.newUpdater(BKLogSegmentEntryReader.class, Throwable.class, "lastException");
    private static final AtomicLongFieldUpdater<BKLogSegmentEntryReader> scheduleCountUpdater = AtomicLongFieldUpdater.newUpdater(BKLogSegmentEntryReader.class, "scheduleCount");
    private static final AtomicIntegerFieldUpdater<BKLogSegmentEntryReader> numReadErrorsUpdater = AtomicIntegerFieldUpdater.newUpdater(BKLogSegmentEntryReader.class, "numReadErrors");
    private CompletableFuture<Void> closePromise = null;
    private volatile Throwable lastException = null;
    private volatile long scheduleCount = 0;
    private volatile boolean hasCaughtupOnInprogress = false;
    private final CopyOnWriteArraySet<LogSegmentEntryReader.StateChangeListener> stateChangeListeners = new CopyOnWriteArraySet<>();
    private volatile int numReadErrors = 0;
    int cachedEntries = 0;
    int numOutstandingEntries = 0;
    private final List<LedgerHandle> openLedgerHandles = Lists.newArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dlshade/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader$CacheEntry.class */
    public class CacheEntry implements SafeRunnable, AsyncCallback.ReadCallback, AsyncCallback.ReadLastConfirmedAndEntryCallback {
        protected final long entryId;
        private boolean done;
        private LedgerEntry entry;
        private int rc;

        private CacheEntry(long j) {
            this.entryId = j;
            this.entry = null;
            this.rc = BKException.Code.UnexpectedConditionException;
            this.done = false;
        }

        long getEntryId() {
            return this.entryId;
        }

        synchronized boolean isDone() {
            return this.done;
        }

        synchronized void release() {
            if (null != this.entry) {
                this.entry.getEntryBuffer().release();
                this.entry = null;
            }
        }

        void release(LedgerEntry ledgerEntry) {
            if (null != ledgerEntry) {
                ledgerEntry.getEntryBuffer().release();
            }
        }

        void complete(LedgerEntry ledgerEntry) {
            if (BKLogSegmentEntryReader.this.isClosed()) {
                release(ledgerEntry);
            }
            synchronized (this) {
                if (this.done) {
                    return;
                }
                this.rc = 0;
                this.entry = ledgerEntry;
                setDone(true);
            }
        }

        void completeExceptionally(int i) {
            synchronized (this) {
                if (this.done) {
                    return;
                }
                this.rc = i;
                setDone(false);
            }
        }

        void setDone(boolean z) {
            synchronized (this) {
                this.done = true;
            }
            BKLogSegmentEntryReader.this.onReadEntryDone(z);
        }

        synchronized boolean isSuccess() {
            return 0 == this.rc;
        }

        synchronized LedgerEntry getEntry() {
            this.entry.getEntryBuffer().retain();
            return this.entry;
        }

        synchronized int getRc() {
            return this.rc;
        }

        @Override // dlshade.org.apache.bookkeeper.client.AsyncCallback.ReadCallback
        public void readComplete(int i, LedgerHandle ledgerHandle, Enumeration<LedgerEntry> enumeration, Object obj) {
            if (BKLogSegmentEntryReader.this.failureInjector.shouldInjectCorruption(this.entryId, this.entryId)) {
                i = -5;
            }
            processReadEntries(i, ledgerHandle, enumeration, obj);
        }

        void processReadEntries(int i, LedgerHandle ledgerHandle, Enumeration<LedgerEntry> enumeration, Object obj) {
            if (isDone() || !checkReturnCodeAndHandleFailure(i, false)) {
                return;
            }
            LedgerEntry ledgerEntry = null;
            while (true) {
                LedgerEntry ledgerEntry2 = ledgerEntry;
                if (!enumeration.hasMoreElements()) {
                    if (null == ledgerEntry2 || ledgerEntry2.getEntryId() != this.entryId) {
                        completeExceptionally(BKException.Code.UnexpectedConditionException);
                        return;
                    } else {
                        complete(ledgerEntry2);
                        return;
                    }
                }
                if (null != ledgerEntry2) {
                    completeExceptionally(BKException.Code.UnexpectedConditionException);
                    return;
                }
                ledgerEntry = enumeration.nextElement();
            }
        }

        @Override // dlshade.org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedAndEntryCallback
        public void readLastConfirmedAndEntryComplete(int i, long j, LedgerEntry ledgerEntry, Object obj) {
            if (BKLogSegmentEntryReader.this.failureInjector.shouldInjectCorruption(this.entryId, this.entryId)) {
                i = -5;
            }
            processReadEntry(i, j, ledgerEntry, obj);
        }

        void processReadEntry(int i, long j, LedgerEntry ledgerEntry, Object obj) {
            if (!isDone() && checkReturnCodeAndHandleFailure(i, true)) {
                if (null == ledgerEntry || this.entryId != j) {
                    BKLogSegmentEntryReader.this.issueRead(this);
                } else {
                    complete(ledgerEntry);
                }
            }
        }

        boolean checkReturnCodeAndHandleFailure(int i, boolean z) {
            if (0 == i) {
                BKLogSegmentEntryReader.numReadErrorsUpdater.set(BKLogSegmentEntryReader.this, 0);
                return true;
            }
            if (-8 != i && (!z || -7 != i)) {
                completeExceptionally(i);
                return false;
            }
            BKLogSegmentEntryReader.this.scheduler.scheduleOrdered(Long.valueOf(BKLogSegmentEntryReader.this.getSegment().getLogSegmentId()), this, Math.min(Math.max(1, BKLogSegmentEntryReader.numReadErrorsUpdater.incrementAndGet(BKLogSegmentEntryReader.this)) * BKLogSegmentEntryReader.this.readAheadWaitTime, BKLogSegmentEntryReader.this.maxReadBackoffTime), TimeUnit.MILLISECONDS);
            return false;
        }

        @Override // dlshade.org.apache.bookkeeper.common.util.SafeRunnable
        public void safeRun() {
            BKLogSegmentEntryReader.this.issueRead(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dlshade/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader$PendingReadRequest.class */
    public class PendingReadRequest {
        private final int numEntries;
        private final List<Entry.Reader> entries;
        private final CompletableFuture<List<Entry.Reader>> promise;

        PendingReadRequest(int i) {
            this.numEntries = i;
            if (i == 1) {
                this.entries = new ArrayList(1);
            } else {
                this.entries = new ArrayList();
            }
            this.promise = new CompletableFuture<>();
        }

        CompletableFuture<List<Entry.Reader>> getPromise() {
            return this.promise;
        }

        void completeExceptionally(Throwable th) {
            FutureUtils.completeExceptionally(this.promise, th);
        }

        void addEntry(Entry.Reader reader) {
            this.entries.add(reader);
        }

        void complete() {
            FutureUtils.complete(this.promise, this.entries);
            BKLogSegmentEntryReader.this.onEntriesConsumed(this.entries.size());
        }

        boolean hasReadEntries() {
            return this.entries.size() > 0;
        }

        boolean hasReadEnoughEntries() {
            return this.entries.size() >= this.numEntries;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BKLogSegmentEntryReader(LogSegmentMetadata logSegmentMetadata, LedgerHandle ledgerHandle, long j, BookKeeper bookKeeper, OrderedScheduler orderedScheduler, DistributedLogConfiguration distributedLogConfiguration, StatsLogger statsLogger, AsyncFailureInjector asyncFailureInjector) {
        this.metadata = logSegmentMetadata;
        this.lssn = logSegmentMetadata.getLogSegmentSequenceNumber();
        this.startSequenceId = logSegmentMetadata.getStartSequenceId();
        this.envelopeEntries = logSegmentMetadata.getEnvelopeEntries();
        this.deserializeRecordSet = distributedLogConfiguration.getDeserializeRecordSetOnReads();
        this.lh = ledgerHandle;
        this.nextEntryId = Math.max(j, 0L);
        this.bk = bookKeeper;
        this.conf = distributedLogConfiguration;
        this.numPrefetchEntries = distributedLogConfiguration.getNumPrefetchEntriesPerLogSegment();
        this.maxPrefetchEntries = distributedLogConfiguration.getMaxPrefetchEntriesPerLogSegment();
        this.scheduler = orderedScheduler;
        this.openLedgerHandles.add(ledgerHandle);
        this.outstandingLongPoll = null;
        this.readAheadEntries = new LinkedBlockingQueue<>();
        this.readQueue = new LinkedList<>();
        this.readAheadWaitTime = distributedLogConfiguration.getReadAheadWaitTime();
        this.maxReadBackoffTime = 4 * distributedLogConfiguration.getReadAheadWaitTime();
        this.skipBrokenEntries = distributedLogConfiguration.getReadAheadSkipBrokenEntries();
        this.failureInjector = asyncFailureInjector;
        this.skippedBrokenEntriesCounter = statsLogger.getCounter("skipped_broken_entries");
    }

    @VisibleForTesting
    public synchronized CacheEntry getOutstandingLongPoll() {
        return this.outstandingLongPoll;
    }

    @VisibleForTesting
    LinkedBlockingQueue<CacheEntry> getReadAheadEntries() {
        return this.readAheadEntries;
    }

    synchronized LedgerHandle getLh() {
        return this.lh;
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentEntryReader
    public synchronized LogSegmentMetadata getSegment() {
        return this.metadata;
    }

    @VisibleForTesting
    synchronized long getNextEntryId() {
        return this.nextEntryId;
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentEntryReader
    public void start() {
        prefetchIfNecessary();
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentEntryReader
    public boolean hasCaughtUpOnInprogress() {
        return this.hasCaughtupOnInprogress;
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentEntryReader
    public LogSegmentEntryReader registerListener(LogSegmentEntryReader.StateChangeListener stateChangeListener) {
        this.stateChangeListeners.add(stateChangeListener);
        return this;
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentEntryReader
    public LogSegmentEntryReader unregisterListener(LogSegmentEntryReader.StateChangeListener stateChangeListener) {
        this.stateChangeListeners.remove(stateChangeListener);
        return this;
    }

    private void notifyCaughtupOnInprogress() {
        Iterator<LogSegmentEntryReader.StateChangeListener> it = this.stateChangeListeners.iterator();
        while (it.hasNext()) {
            it.next().onCaughtupOnInprogress();
        }
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentEntryReader
    public synchronized void onLogSegmentMetadataUpdated(LogSegmentMetadata logSegmentMetadata) {
        if (this.metadata == logSegmentMetadata || LogSegmentMetadata.COMPARATOR.compare(this.metadata, logSegmentMetadata) == 0 || !this.metadata.isInProgress() || logSegmentMetadata.isInProgress()) {
            return;
        }
        this.bk.asyncOpenLedger(logSegmentMetadata.getLogSegmentId(), BookKeeper.DigestType.CRC32, this.conf.getBKDigestPW().getBytes(StandardCharsets.UTF_8), this, logSegmentMetadata);
    }

    @Override // dlshade.org.apache.bookkeeper.client.AsyncCallback.OpenCallback
    public void openComplete(int i, LedgerHandle ledgerHandle, Object obj) {
        LogSegmentMetadata logSegmentMetadata = (LogSegmentMetadata) obj;
        if (0 != i) {
            failOrRetryOpenLedger(i, logSegmentMetadata);
            return;
        }
        synchronized (this) {
            if (isClosed()) {
                ledgerHandle.asyncClose(new AsyncCallback.CloseCallback() { // from class: dlshade.org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryReader.1
                    @Override // dlshade.org.apache.bookkeeper.client.AsyncCallback.CloseCallback
                    public void closeComplete(int i2, LedgerHandle ledgerHandle2, Object obj2) {
                        BKLogSegmentEntryReader.logger.debug("Close the open ledger {} since the log segment reader is already closed", Long.valueOf(ledgerHandle2.getId()));
                    }
                }, null);
                return;
            }
            this.metadata = logSegmentMetadata;
            this.lh = ledgerHandle;
            this.openLedgerHandles.add(ledgerHandle);
            CacheEntry cacheEntry = this.outstandingLongPoll;
            if (null != cacheEntry) {
                issueRead(cacheEntry);
            }
            notifyReaders();
        }
    }

    private void failOrRetryOpenLedger(int i, LogSegmentMetadata logSegmentMetadata) {
        if (isClosed()) {
            return;
        }
        if (isBeyondLastAddConfirmed()) {
            completeExceptionally(new BKTransmitException("Failed to open ledger for reading log segment " + getSegment(), i), true);
        } else {
            this.scheduler.scheduleOrdered(Long.valueOf(logSegmentMetadata.getLogSegmentId()), () -> {
                onLogSegmentMetadataUpdated(logSegmentMetadata);
            }, this.conf.getZKRetryBackoffStartMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private boolean checkClosedOrInError() {
        Throwable th = lastExceptionUpdater.get(this);
        if (null == th) {
            return false;
        }
        cancelAllPendingReads(th);
        return true;
    }

    private void completeExceptionally(Throwable th, boolean z) {
        lastExceptionUpdater.compareAndSet(this, null, th);
        if (z) {
            notifyReaders();
        }
    }

    private void notifyReaders() {
        processReadRequests();
    }

    private void cancelAllPendingReads(Throwable th) {
        ArrayList newArrayListWithExpectedSize;
        synchronized (this.readQueue) {
            newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(this.readQueue.size());
            newArrayListWithExpectedSize.addAll(this.readQueue);
            this.readQueue.clear();
        }
        Iterator it = newArrayListWithExpectedSize.iterator();
        while (it.hasNext()) {
            ((PendingReadRequest) it.next()).completeExceptionally(th);
        }
    }

    private void releaseAllCachedEntries() {
        synchronized (this) {
            CacheEntry poll = this.readAheadEntries.poll();
            while (null != poll) {
                poll.release();
                poll = this.readAheadEntries.poll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onReadEntryDone(boolean z) {
        synchronized (this) {
            this.numOutstandingEntries--;
        }
        notifyReaders();
        if (z) {
            prefetchIfNecessary();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onEntriesConsumed(int i) {
        synchronized (this) {
            this.cachedEntries -= i;
        }
        prefetchIfNecessary();
    }

    private void prefetchIfNecessary() {
        synchronized (this) {
            if (this.cachedEntries >= this.maxPrefetchEntries) {
                return;
            }
            int i = this.numPrefetchEntries - this.numOutstandingEntries;
            if (i <= 0) {
                return;
            }
            ArrayList arrayList = new ArrayList(i);
            for (int i2 = 0; i2 < i && this.cachedEntries < this.maxPrefetchEntries && ((!isLedgerClosed() || this.nextEntryId <= getLastAddConfirmed()) && (isLedgerClosed() || this.nextEntryId <= getLastAddConfirmed() + 1)); i2++) {
                CacheEntry cacheEntry = new CacheEntry(this.nextEntryId);
                arrayList.add(cacheEntry);
                this.readAheadEntries.add(cacheEntry);
                this.numOutstandingEntries++;
                this.cachedEntries++;
                this.nextEntryId++;
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                issueRead((CacheEntry) it.next());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void issueRead(CacheEntry cacheEntry) {
        if (isClosed()) {
            return;
        }
        if (isLedgerClosed()) {
            if (isNotBeyondLastAddConfirmed(cacheEntry.getEntryId())) {
                issueSimpleRead(cacheEntry);
                return;
            } else {
                notifyReaders();
                return;
            }
        }
        if (isNotBeyondLastAddConfirmed(cacheEntry.getEntryId())) {
            issueSimpleRead(cacheEntry);
        } else {
            issueLongPollRead(cacheEntry);
        }
    }

    private void issueSimpleRead(CacheEntry cacheEntry) {
        getLh().asyncReadEntries(cacheEntry.entryId, cacheEntry.entryId, cacheEntry, null);
    }

    private void issueLongPollRead(CacheEntry cacheEntry) {
        synchronized (this) {
            this.outstandingLongPoll = cacheEntry;
        }
        if (!this.hasCaughtupOnInprogress) {
            this.hasCaughtupOnInprogress = true;
            notifyCaughtupOnInprogress();
        }
        getLh().asyncReadLastConfirmedAndEntry(cacheEntry.entryId, this.conf.getReadLACLongPollTimeout(), false, cacheEntry, null);
    }

    Entry.Reader processReadEntry(LedgerEntry ledgerEntry) throws IOException {
        return Entry.newBuilder().setLogSegmentInfo(this.lssn, this.startSequenceId).setEntryId(ledgerEntry.getEntryId()).setEnvelopeEntry(this.envelopeEntries).deserializeRecordSet(this.deserializeRecordSet).setEntry(ledgerEntry.getEntryBuffer()).buildReader();
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentEntryReader
    public CompletableFuture<List<Entry.Reader>> readNext(int i) {
        boolean isEmpty;
        PendingReadRequest pendingReadRequest = new PendingReadRequest(i);
        if (checkClosedOrInError()) {
            pendingReadRequest.completeExceptionally(lastExceptionUpdater.get(this));
        } else {
            synchronized (this.readQueue) {
                isEmpty = this.readQueue.isEmpty();
                this.readQueue.add(pendingReadRequest);
            }
            if (isEmpty) {
                processReadRequests();
            }
        }
        return pendingReadRequest.getPromise();
    }

    private void processReadRequests() {
        if (!isClosed() && 0 == scheduleCountUpdater.getAndIncrement(this)) {
            this.scheduler.executeOrdered(getSegment().getLogSegmentId(), this);
        }
    }

    @Override // dlshade.org.apache.bookkeeper.common.util.SafeRunnable
    public void safeRun() {
        PendingReadRequest peek;
        PendingReadRequest poll;
        long j = scheduleCountUpdater.get(this);
        while (true) {
            synchronized (this.readQueue) {
                peek = this.readQueue.peek();
            }
            if (null == peek) {
                scheduleCountUpdater.set(this, 0L);
                return;
            }
            if (null == lastExceptionUpdater.get(this) && peek.getPromise().isCancelled()) {
                completeExceptionally(new DLInterruptedException("Interrupted on reading log segment " + getSegment() + " : " + peek.getPromise().isCancelled()), false);
            }
            if (checkClosedOrInError()) {
                return;
            }
            readEntriesFromReadAheadCache(peek);
            if (peek.hasReadEntries()) {
                synchronized (this.readQueue) {
                    poll = this.readQueue.poll();
                }
                if (null == poll || peek != poll) {
                    DLIllegalStateException dLIllegalStateException = new DLIllegalStateException("Unexpected condition at reading from " + getSegment());
                    peek.completeExceptionally(dLIllegalStateException);
                    if (null != poll) {
                        poll.completeExceptionally(dLIllegalStateException);
                    }
                    completeExceptionally(dLIllegalStateException, false);
                } else {
                    poll.complete();
                }
            } else if (0 == j) {
                return;
            } else {
                j = scheduleCountUpdater.decrementAndGet(this);
            }
        }
    }

    private void readEntriesFromReadAheadCache(PendingReadRequest pendingReadRequest) {
        CacheEntry peek;
        boolean z;
        while (!pendingReadRequest.hasReadEnoughEntries()) {
            synchronized (this) {
                peek = this.readAheadEntries.peek();
                z = null == peek && isEndOfLogSegment();
            }
            if (z) {
                completeExceptionally(new EndOfLogSegmentException(getSegment().getZNodeName()), false);
                return;
            }
            if (null == peek) {
                return;
            }
            if (!peek.isDone()) {
                if (isEndOfLogSegment(peek.getEntryId())) {
                    completeExceptionally(new EndOfLogSegmentException(getSegment().getZNodeName()), false);
                    return;
                }
                return;
            }
            if (peek.isSuccess()) {
                CacheEntry poll = this.readAheadEntries.poll();
                if (peek != poll) {
                    completeExceptionally(new DLIllegalStateException("Unexpected condition at reading from " + getSegment()), false);
                    ReferenceCountUtil.safeRelease(poll);
                    return;
                }
                try {
                    try {
                        pendingReadRequest.addEntry(processReadEntry(peek.getEntry()));
                        ReferenceCountUtil.safeRelease(poll);
                    } catch (IOException e) {
                        completeExceptionally(e, false);
                        ReferenceCountUtil.safeRelease(poll);
                        return;
                    }
                } catch (Throwable th) {
                    ReferenceCountUtil.safeRelease(poll);
                    throw th;
                }
            } else if (!this.skipBrokenEntries || -5 != peek.getRc()) {
                completeExceptionally(new BKTransmitException("Encountered issue on reading entry " + peek.getEntryId() + " @ log segment " + getSegment(), peek.getRc()), false);
                return;
            } else {
                this.skippedBrokenEntriesCounter.inc();
                this.readAheadEntries.poll().release();
            }
        }
    }

    private synchronized boolean isEndOfLogSegment() {
        return isEndOfLogSegment(this.nextEntryId);
    }

    private boolean isEndOfLogSegment(long j) {
        return isLedgerClosed() && j > getLastAddConfirmed();
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentEntryReader
    public synchronized boolean isBeyondLastAddConfirmed() {
        return isBeyondLastAddConfirmed(this.nextEntryId);
    }

    private boolean isBeyondLastAddConfirmed(long j) {
        return j > getLastAddConfirmed();
    }

    private boolean isNotBeyondLastAddConfirmed(long j) {
        return j <= getLastAddConfirmed();
    }

    private boolean isLedgerClosed() {
        return getLh().isClosed();
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentEntryReader
    public long getLastAddConfirmed() {
        return getLh().getLastAddConfirmed();
    }

    synchronized boolean isClosed() {
        return null != this.closePromise;
    }

    @Override // dlshade.org.apache.distributedlog.io.AsyncCloseable
    public CompletableFuture<Void> asyncClose() {
        synchronized (this) {
            if (null != this.closePromise) {
                return this.closePromise;
            }
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            this.closePromise = completableFuture;
            LedgerHandle[] ledgerHandleArr = (LedgerHandle[]) this.openLedgerHandles.toArray(new LedgerHandle[this.openLedgerHandles.size()]);
            ReadCancelledException readCancelledException = new ReadCancelledException(getSegment().getZNodeName(), "Reader was closed");
            completeExceptionally(readCancelledException, false);
            releaseAllCachedEntries();
            cancelAllPendingReads(readCancelledException);
            FutureUtils.proxyTo(BKUtils.closeLedgers(ledgerHandleArr), completableFuture);
            return completableFuture;
        }
    }
}
