package org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Optional;
import org.apache.pulsar.functions.runtime.shaded.javax.annotation.Nullable;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.Version;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.callback.LogSegmentListener;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.callback.LogSegmentNamesListener;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.LockingException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.LogSegmentNotFoundException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.DistributedLock;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.logsegment.LogSegmentEntryStore;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.logsegment.LogSegmentFilter;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.metadata.LogMetadataForReader;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.metadata.LogStreamMetadataStore;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/distributedlog/BKLogReadHandler.class */
public class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
    static final Logger LOG = LoggerFactory.getLogger((Class<?>) BKLogReadHandler.class);
    protected final LogMetadataForReader logMetadataForReader;
    protected final DynamicDistributedLogConfiguration dynConf;
    private final Optional<String> subscriberId;
    private DistributedLock readLock;
    private CompletableFuture<Void> lockAcquireFuture;
    protected final AsyncNotification readerStateNotification;
    protected boolean logSegmentsNotificationDisabled;
    protected final CopyOnWriteArraySet<LogSegmentListener> listeners;
    protected Versioned<List<LogSegmentMetadata>> lastNotifiedLogSegments;
    private final StatsLogger perLogStatsLogger;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BKLogReadHandler(LogMetadataForReader logMetadataForReader, Optional<String> optional, DistributedLogConfiguration distributedLogConfiguration, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration, LogStreamMetadataStore logStreamMetadataStore, LogSegmentMetadataCache logSegmentMetadataCache, LogSegmentEntryStore logSegmentEntryStore, OrderedScheduler orderedScheduler, AlertStatsLogger alertStatsLogger, StatsLogger statsLogger, StatsLogger statsLogger2, String str, AsyncNotification asyncNotification, boolean z) {
        super(logMetadataForReader, distributedLogConfiguration, logStreamMetadataStore, logSegmentMetadataCache, logSegmentEntryStore, orderedScheduler, statsLogger, alertStatsLogger, str);
        this.logSegmentsNotificationDisabled = false;
        this.listeners = new CopyOnWriteArraySet<>();
        this.lastNotifiedLogSegments = new Versioned<>(null, Version.NEW);
        this.logMetadataForReader = logMetadataForReader;
        this.dynConf = dynamicDistributedLogConfiguration;
        this.perLogStatsLogger = z ? statsLogger2 : NullStatsLogger.INSTANCE;
        this.readerStateNotification = asyncNotification;
        this.subscriberId = optional;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public String getReadLockPath() {
        return this.logMetadataForReader.getReadLockPath(this.subscriberId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> checkLogStreamExists() {
        return this.streamMetadataStore.logExists(this.logMetadata.getUri(), this.logMetadata.getLogName());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized CompletableFuture<Void> lockStream() {
        if (null == this.lockAcquireFuture) {
            this.lockAcquireFuture = this.streamMetadataStore.createReadLock(this.logMetadataForReader, this.subscriberId).thenCompose(distributedLock -> {
                try {
                    this.readLock = distributedLock;
                    LOG.info("acquiring readlock {} at {}", getLockClientId(), getReadLockPath());
                    return acquireLockOnExecutorThread(distributedLock);
                } catch (LockingException e) {
                    return FutureUtils.exception(e);
                }
            });
        }
        return this.lockAcquireFuture;
    }

    CompletableFuture<Void> acquireLockOnExecutorThread(DistributedLock distributedLock) throws LockingException {
        CompletableFuture<? extends DistributedLock> asyncAcquire = distributedLock.asyncAcquire();
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        completableFuture.whenComplete((r4, th) -> {
            if (th instanceof CancellationException) {
                asyncAcquire.cancel(true);
            }
        });
        asyncAcquire.whenCompleteAsync((BiConsumer<? super Object, ? super Throwable>) new FutureEventListener<DistributedLock>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.BKLogReadHandler.1
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onSuccess(DistributedLock distributedLock2) {
                BKLogReadHandler.LOG.info("acquired readlock {} at {}", BKLogReadHandler.this.getLockClientId(), BKLogReadHandler.this.getReadLockPath());
                completableFuture.complete(null);
            }

            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onFailure(Throwable th2) {
                BKLogReadHandler.LOG.info("failed to acquire readlock {} at {}", BKLogReadHandler.this.getLockClientId(), BKLogReadHandler.this.getReadLockPath(), th2);
                completableFuture.completeExceptionally(th2);
            }
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void checkReadLock() throws DLIllegalStateException, LockingException {
        synchronized (this) {
            if (null == this.lockAcquireFuture || !this.lockAcquireFuture.isDone()) {
                throw new DLIllegalStateException("Attempt to check for lock before it has been acquired successfully");
            }
        }
        this.readLock.checkOwnership();
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.io.AsyncCloseable
    public CompletableFuture<Void> asyncClose() {
        DistributedLock distributedLock;
        synchronized (this) {
            if (null != this.lockAcquireFuture && !this.lockAcquireFuture.isDone()) {
                this.lockAcquireFuture.cancel(true);
            }
            distributedLock = this.readLock;
        }
        return Utils.closeSequence(this.scheduler, distributedLock).thenApply(r5 -> {
            this.metadataStore.unregisterLogSegmentListener(this.logMetadata.getLogSegmentsPath(), this);
            return null;
        });
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.BKLogHandler, org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.io.AsyncAbortable
    public CompletableFuture<Void> asyncAbort() {
        return asyncClose();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Versioned<List<LogSegmentMetadata>>> asyncStartFetchLogSegments() {
        CompletableFuture<Versioned<List<LogSegmentMetadata>>> completableFuture = new CompletableFuture<>();
        asyncStartFetchLogSegments(completableFuture);
        return completableFuture;
    }

    void asyncStartFetchLogSegments(final CompletableFuture<Versioned<List<LogSegmentMetadata>>> completableFuture) {
        readLogSegmentsFromStore(LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, this).whenComplete((BiConsumer<? super Versioned<List<LogSegmentMetadata>>, ? super Throwable>) new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.BKLogReadHandler.2
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onFailure(Throwable th) {
                if (!(th instanceof LogNotFoundException) && !(th instanceof LogSegmentNotFoundException) && !(th instanceof UnexpectedException)) {
                    BKLogReadHandler.this.scheduler.schedule(new Runnable() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.BKLogReadHandler.2.1
                        @Override // java.lang.Runnable
                        public void run() {
                            BKLogReadHandler.this.asyncStartFetchLogSegments(completableFuture);
                        }
                    }, BKLogReadHandler.this.conf.getZKRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS);
                    return;
                }
                BKLogHandler.METADATA_EXCEPTION_UPDATER.compareAndSet(BKLogReadHandler.this, null, (IOException) th);
                BKLogReadHandler.this.notifyReaderOnError(th);
                FutureUtils.completeExceptionally(completableFuture, th);
            }

            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onSuccess(Versioned<List<LogSegmentMetadata>> versioned) {
                FutureUtils.complete(completableFuture, versioned);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void disableReadAheadLogSegmentsNotification() {
        this.logSegmentsNotificationDisabled = true;
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.callback.LogSegmentNamesListener
    public void onSegmentsUpdated(final Versioned<List<String>> versioned) {
        synchronized (this) {
            if (this.lastNotifiedLogSegments.getVersion() == Version.NEW || this.lastNotifiedLogSegments.getVersion().compare(versioned.getVersion()) == Version.Occurred.BEFORE) {
                CompletableFuture<Versioned<List<LogSegmentMetadata>>> completableFuture = new CompletableFuture<>();
                completableFuture.whenComplete((BiConsumer<? super Versioned<List<LogSegmentMetadata>>, ? super Throwable>) new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.BKLogReadHandler.3
                    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener
                    public void onFailure(Throwable th) {
                        if (!(th instanceof LogNotFoundException) && !(th instanceof LogSegmentNotFoundException) && !(th instanceof UnexpectedException)) {
                            BKLogReadHandler.this.scheduler.schedule(new Runnable() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.BKLogReadHandler.3.1
                                @Override // java.lang.Runnable
                                public void run() {
                                    BKLogReadHandler.this.onSegmentsUpdated(versioned);
                                }
                            }, BKLogReadHandler.this.conf.getZKRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS);
                        } else {
                            BKLogHandler.METADATA_EXCEPTION_UPDATER.compareAndSet(BKLogReadHandler.this, null, (IOException) th);
                            BKLogReadHandler.this.notifyReaderOnError(th);
                        }
                    }

                    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener
                    public void onSuccess(Versioned<List<LogSegmentMetadata>> versioned2) {
                        List<LogSegmentMetadata> list = null;
                        synchronized (BKLogReadHandler.this) {
                            Versioned<List<LogSegmentMetadata>> versioned3 = BKLogReadHandler.this.lastNotifiedLogSegments;
                            if (versioned3.getVersion() == Version.NEW || versioned3.getVersion().compare(versioned2.getVersion()) == Version.Occurred.BEFORE) {
                                BKLogReadHandler.this.lastNotifiedLogSegments = versioned2;
                                list = versioned2.getValue();
                            }
                        }
                        if (null != list) {
                            BKLogReadHandler.this.notifyUpdatedLogSegments(list);
                        }
                    }
                });
                readLogSegmentsFromStore(versioned, LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, completableFuture);
            }
        }
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.callback.LogSegmentNamesListener
    public void onLogStreamDeleted() {
        notifyLogStreamDeleted();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerListener(@Nullable LogSegmentListener logSegmentListener) {
        if (null != logSegmentListener) {
            this.listeners.add(logSegmentListener);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unregisterListener(@Nullable LogSegmentListener logSegmentListener) {
        if (null != logSegmentListener) {
            this.listeners.remove(logSegmentListener);
        }
    }

    protected void notifyUpdatedLogSegments(List<LogSegmentMetadata> list) {
        if (this.logSegmentsNotificationDisabled) {
            return;
        }
        Iterator<LogSegmentListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            LogSegmentListener next = it.next();
            ArrayList arrayList = new ArrayList(list);
            Collections.sort(arrayList, LogSegmentMetadata.COMPARATOR);
            next.onSegmentsUpdated(arrayList);
        }
    }

    protected void notifyLogStreamDeleted() {
        if (this.logSegmentsNotificationDisabled) {
            return;
        }
        Iterator<LogSegmentListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onLogStreamDeleted();
        }
    }

    protected void notifyReaderOnError(Throwable th) {
        if (null != this.readerStateNotification) {
            this.readerStateNotification.notifyOnError(th);
        }
    }
}
