package dlshade.org.apache.distributedlog.impl;

import dlshade.com.google.common.collect.ImmutableList;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import dlshade.org.apache.bookkeeper.common.util.OrderedScheduler;
import dlshade.org.apache.bookkeeper.versioning.LongVersion;
import dlshade.org.apache.bookkeeper.versioning.Version;
import dlshade.org.apache.bookkeeper.versioning.Versioned;
import dlshade.org.apache.distributedlog.DistributedLogConfiguration;
import dlshade.org.apache.distributedlog.LogSegmentMetadata;
import dlshade.org.apache.distributedlog.ZooKeeperClient;
import dlshade.org.apache.distributedlog.callback.LogSegmentNamesListener;
import dlshade.org.apache.distributedlog.exceptions.LogNotFoundException;
import dlshade.org.apache.distributedlog.exceptions.LogSegmentNotFoundException;
import dlshade.org.apache.distributedlog.exceptions.ZKException;
import dlshade.org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
import dlshade.org.apache.distributedlog.metadata.LogMetadata;
import dlshade.org.apache.distributedlog.metadata.LogMetadataForWriter;
import dlshade.org.apache.distributedlog.util.DLUtils;
import dlshade.org.apache.distributedlog.util.Transaction;
import dlshade.org.apache.distributedlog.util.Utils;
import dlshade.org.apache.distributedlog.zk.DefaultZKOp;
import dlshade.org.apache.distributedlog.zk.ZKTransaction;
import dlshade.org.apache.distributedlog.zk.ZKVersionedSetOp;
import dlshade.org.apache.zookeeper.AsyncCallback;
import dlshade.org.apache.zookeeper.CreateMode;
import dlshade.org.apache.zookeeper.KeeperException;
import dlshade.org.apache.zookeeper.Op;
import dlshade.org.apache.zookeeper.WatchedEvent;
import dlshade.org.apache.zookeeper.Watcher;
import dlshade.org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dlshade/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.class */
public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watcher, AsyncCallback.Children2Callback {
    private static final Logger logger;
    private static final List<String> EMPTY_LIST;
    final DistributedLogConfiguration conf;
    final int minZKBackoffMs;
    final int maxZKBackoffMs;
    final boolean skipMinVersionCheck;
    final ZooKeeperClient zkc;
    final OrderedScheduler scheduler;
    static final /* synthetic */ boolean $assertionsDisabled;
    boolean closed = false;
    final ConcurrentMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>> listeners = new ConcurrentHashMap();
    final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();

    /* loaded from: input_file:dlshade/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore$ReadLogSegmentsTask.class */
    private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<Versioned<List<String>>> {
        private final String logSegmentsPath;
        private final ZKLogSegmentMetadataStore store;
        private int currentZKBackOffMs;

        ReadLogSegmentsTask(String str, ZKLogSegmentMetadataStore zKLogSegmentMetadataStore) {
            this.logSegmentsPath = str;
            this.store = zKLogSegmentMetadataStore;
            this.currentZKBackOffMs = this.store.minZKBackoffMs;
        }

        @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
        public void onSuccess(Versioned<List<String>> versioned) {
            this.currentZKBackOffMs = this.store.minZKBackoffMs;
            this.store.notifyLogSegmentsUpdated(this.logSegmentsPath, this.store.listeners.get(this.logSegmentsPath), versioned);
        }

        @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
        public void onFailure(Throwable th) {
            if (th instanceof LogNotFoundException) {
                this.store.notifyLogStreamDeleted(this.logSegmentsPath, this.store.listeners.remove(this.logSegmentsPath));
                return;
            }
            int i = this.currentZKBackOffMs;
            this.currentZKBackOffMs = Math.min(2 * this.currentZKBackOffMs, this.store.maxZKBackoffMs);
            this.store.scheduleTask(this.logSegmentsPath, this, i);
        }

        @Override // java.lang.Runnable
        public void run() {
            if (null != this.store.listeners.get(this.logSegmentsPath)) {
                this.store.zkGetLogSegmentNames(this.logSegmentsPath, this.store).whenComplete((BiConsumer<? super Versioned<List<String>>, ? super Throwable>) this);
            } else {
                ZKLogSegmentMetadataStore.logger.debug("Log segments listener for {} has been removed.", this.logSegmentsPath);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:dlshade/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore$VersionedLogSegmentNamesListener.class */
    public static class VersionedLogSegmentNamesListener {
        private final LogSegmentNamesListener listener;
        private Versioned<List<String>> lastNotifiedLogSegments = new Versioned<>(ZKLogSegmentMetadataStore.EMPTY_LIST, Version.NEW);

        VersionedLogSegmentNamesListener(LogSegmentNamesListener logSegmentNamesListener) {
            this.listener = logSegmentNamesListener;
        }

        synchronized void onSegmentsUpdated(Versioned<List<String>> versioned) {
            if (this.lastNotifiedLogSegments.getVersion() == Version.NEW || this.lastNotifiedLogSegments.getVersion().compare(versioned.getVersion()) == Version.Occurred.BEFORE) {
                this.lastNotifiedLogSegments = versioned;
                this.listener.onSegmentsUpdated(versioned);
            }
        }

        public int hashCode() {
            return this.listener.hashCode();
        }

        public boolean equals(Object obj) {
            if (obj instanceof VersionedLogSegmentNamesListener) {
                return this.listener.equals(((VersionedLogSegmentNamesListener) obj).listener);
            }
            return false;
        }

        public String toString() {
            return this.listener.toString();
        }
    }

    public ZKLogSegmentMetadataStore(DistributedLogConfiguration distributedLogConfiguration, ZooKeeperClient zooKeeperClient, OrderedScheduler orderedScheduler) {
        this.conf = distributedLogConfiguration;
        this.zkc = zooKeeperClient;
        this.scheduler = orderedScheduler;
        this.minZKBackoffMs = distributedLogConfiguration.getZKRetryBackoffStartMillis();
        this.maxZKBackoffMs = distributedLogConfiguration.getZKRetryBackoffMaxMillis();
        this.skipMinVersionCheck = distributedLogConfiguration.getDLLedgerMetadataSkipMinVersionCheck();
    }

    protected void scheduleTask(Object obj, Runnable runnable, long j) {
        this.closeLock.readLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.scheduler.scheduleOrdered(obj, runnable, j, TimeUnit.MILLISECONDS);
            this.closeLock.readLock().unlock();
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    protected void submitTask(Object obj, Runnable runnable) {
        this.closeLock.readLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.scheduler.executeOrdered(obj, runnable);
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentMetadataStore
    public void storeMaxLogSegmentSequenceNumber(Transaction<Object> transaction, LogMetadata logMetadata, Versioned<Long> versioned, Transaction.OpListener<Version> opListener) {
        Version version = versioned.getVersion();
        if (!$assertionsDisabled && !(version instanceof LongVersion)) {
            throw new AssertionError();
        }
        LongVersion longVersion = (LongVersion) version;
        transaction.addOp(new ZKVersionedSetOp(Op.setData(logMetadata.getLogSegmentsPath(), DLUtils.serializeLogSegmentSequenceNumber(versioned.getValue().longValue()), (int) longVersion.getLongVersion()), opListener));
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentMetadataStore
    public void storeMaxTxnId(Transaction<Object> transaction, LogMetadataForWriter logMetadataForWriter, Versioned<Long> versioned, Transaction.OpListener<Version> opListener) {
        Version version = versioned.getVersion();
        if (!$assertionsDisabled && !(version instanceof LongVersion)) {
            throw new AssertionError();
        }
        LongVersion longVersion = (LongVersion) version;
        transaction.addOp(new ZKVersionedSetOp(Op.setData(logMetadataForWriter.getMaxTxIdPath(), DLUtils.serializeTransactionId(versioned.getValue().longValue()), (int) longVersion.getLongVersion()), opListener));
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentMetadataStore
    public Transaction<Object> transaction() {
        return new ZKTransaction(this.zkc);
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentMetadataStore
    public void createLogSegment(Transaction<Object> transaction, LogSegmentMetadata logSegmentMetadata, Transaction.OpListener<Void> opListener) {
        transaction.addOp(DefaultZKOp.of(Op.create(logSegmentMetadata.getZkPath(), logSegmentMetadata.getFinalisedData().getBytes(StandardCharsets.UTF_8), this.zkc.getDefaultACL(), CreateMode.PERSISTENT), opListener));
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentMetadataStore
    public void deleteLogSegment(Transaction<Object> transaction, final LogSegmentMetadata logSegmentMetadata, final Transaction.OpListener<Void> opListener) {
        Op delete = Op.delete(logSegmentMetadata.getZkPath(), -1);
        logger.info("Delete segment : {}", logSegmentMetadata);
        transaction.addOp(DefaultZKOp.of(delete, new Transaction.OpListener<Void>() { // from class: dlshade.org.apache.distributedlog.impl.ZKLogSegmentMetadataStore.1
            @Override // dlshade.org.apache.distributedlog.util.Transaction.OpListener
            public void onCommit(Void r4) {
                if (null != opListener) {
                    opListener.onCommit(r4);
                }
            }

            @Override // dlshade.org.apache.distributedlog.util.Transaction.OpListener
            public void onAbort(Throwable th) {
                KeeperException.Code keeperExceptionCode;
                ZKLogSegmentMetadataStore.logger.info("Aborted transaction on deleting segment {}", logSegmentMetadata);
                if (th instanceof KeeperException) {
                    keeperExceptionCode = ((KeeperException) th).code();
                } else {
                    if (!(th instanceof ZKException)) {
                        abortListener(th);
                        return;
                    }
                    keeperExceptionCode = ((ZKException) th).getKeeperExceptionCode();
                }
                if (KeeperException.Code.NONODE == keeperExceptionCode) {
                    abortListener(new LogSegmentNotFoundException(logSegmentMetadata.getZkPath()));
                } else {
                    abortListener(th);
                }
            }

            private void abortListener(Throwable th) {
                if (null != opListener) {
                    opListener.onAbort(th);
                }
            }
        }));
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentMetadataStore
    public void updateLogSegment(Transaction<Object> transaction, LogSegmentMetadata logSegmentMetadata) {
        transaction.addOp(DefaultZKOp.of(Op.setData(logSegmentMetadata.getZkPath(), logSegmentMetadata.getFinalisedData().getBytes(StandardCharsets.UTF_8), -1), null));
    }

    @Override // dlshade.org.apache.zookeeper.Watcher
    public void process(WatchedEvent watchedEvent) {
        if (Watcher.Event.EventType.None == watchedEvent.getType() && Watcher.Event.KeeperState.Expired == watchedEvent.getState()) {
            for (String str : new HashSet(this.listeners.keySet())) {
                scheduleTask(str, new ReadLogSegmentsTask(str, this), 0L);
            }
            return;
        }
        String path = watchedEvent.getPath();
        if (null == path) {
            return;
        }
        switch (watchedEvent.getType()) {
            case NodeDeleted:
                notifyLogStreamDeleted(path, this.listeners.remove(path));
                return;
            case NodeChildrenChanged:
                new ReadLogSegmentsTask(path, this).run();
                return;
            default:
                return;
        }
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentMetadataStore
    public CompletableFuture<LogSegmentMetadata> getLogSegment(String str) {
        return LogSegmentMetadata.read(this.zkc, str, this.skipMinVersionCheck);
    }

    CompletableFuture<Versioned<List<String>>> zkGetLogSegmentNames(String str, Watcher watcher) {
        CompletableFuture<Versioned<List<String>>> completableFuture = new CompletableFuture<>();
        try {
            this.zkc.get().getChildren(str, watcher, this, completableFuture);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            completableFuture.completeExceptionally(Utils.zkException(e, str));
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            completableFuture.completeExceptionally(Utils.zkException(e2, str));
        }
        return completableFuture;
    }

    @Override // dlshade.org.apache.zookeeper.AsyncCallback.Children2Callback
    public void processResult(int i, String str, Object obj, List<String> list, Stat stat) {
        CompletableFuture completableFuture = (CompletableFuture) obj;
        if (KeeperException.Code.OK.intValue() == i) {
            completableFuture.complete(new Versioned(list, new LongVersion(stat.getCversion())));
        } else if (KeeperException.Code.NONODE.intValue() == i) {
            completableFuture.completeExceptionally(new LogNotFoundException("Log " + str + " not found"));
        } else {
            completableFuture.completeExceptionally(new ZKException("Failed to get log segments from " + str, KeeperException.Code.get(i)));
        }
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentMetadataStore
    public CompletableFuture<Versioned<List<String>>> getLogSegmentNames(String str, LogSegmentNamesListener logSegmentNamesListener) {
        Watcher watcher;
        if (null == logSegmentNamesListener) {
            watcher = null;
        } else {
            this.closeLock.readLock().lock();
            try {
                if (this.closed) {
                    watcher = null;
                } else {
                    Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> map = this.listeners.get(str);
                    if (null == map) {
                        HashMap hashMap = new HashMap();
                        Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> putIfAbsent = this.listeners.putIfAbsent(str, hashMap);
                        map = null != putIfAbsent ? putIfAbsent : hashMap;
                    }
                    synchronized (map) {
                        map.put(logSegmentNamesListener, new VersionedLogSegmentNamesListener(logSegmentNamesListener));
                        if (!this.listeners.containsKey(str) && null != this.listeners.putIfAbsent(str, map)) {
                            logger.debug("Listener set is already found for log segments path {}", str);
                        }
                    }
                    watcher = this;
                }
            } finally {
                this.closeLock.readLock().unlock();
            }
        }
        CompletableFuture<Versioned<List<String>>> zkGetLogSegmentNames = zkGetLogSegmentNames(str, watcher);
        if (null != logSegmentNamesListener) {
            zkGetLogSegmentNames.whenComplete((BiConsumer<? super Versioned<List<String>>, ? super Throwable>) new ReadLogSegmentsTask(str, this));
        }
        return zkGetLogSegmentNames(str, watcher);
    }

    @Override // dlshade.org.apache.distributedlog.logsegment.LogSegmentMetadataStore
    public void unregisterLogSegmentListener(String str, LogSegmentNamesListener logSegmentNamesListener) {
        this.closeLock.readLock().lock();
        try {
            if (this.closed) {
                return;
            }
            Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> map = this.listeners.get(str);
            if (null == map) {
                this.closeLock.readLock().unlock();
                return;
            }
            synchronized (map) {
                map.remove(logSegmentNamesListener);
                if (map.isEmpty()) {
                    this.listeners.remove(str, map);
                }
            }
            this.closeLock.readLock().unlock();
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closeLock.writeLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
        } finally {
            this.closeLock.writeLock().unlock();
        }
    }

    void notifyLogStreamDeleted(String str, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> map) {
        if (null == map) {
            return;
        }
        submitTask(str, () -> {
            synchronized (map) {
                Iterator it = map.keySet().iterator();
                while (it.hasNext()) {
                    ((LogSegmentNamesListener) it.next()).onLogStreamDeleted();
                }
            }
        });
    }

    void notifyLogSegmentsUpdated(String str, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> map, Versioned<List<String>> versioned) {
        if (null == map) {
            return;
        }
        submitTask(str, () -> {
            synchronized (map) {
                Iterator it = map.values().iterator();
                while (it.hasNext()) {
                    ((VersionedLogSegmentNamesListener) it.next()).onSegmentsUpdated(versioned);
                }
            }
        });
    }

    static {
        $assertionsDisabled = !ZKLogSegmentMetadataStore.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(ZKLogSegmentMetadataStore.class);
        EMPTY_LIST = ImmutableList.of();
    }
}
