package org.apache.pulsar.metadata.coordination.impl;

import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-4.0.2.jar:org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.class */
public class ResourceLockImpl<T> implements ResourceLock<T> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ResourceLockImpl.class);
    private final MetadataStoreExtended store;
    private final MetadataSerde<T> serde;
    private final String path;
    private volatile T value;
    private final ScheduledExecutorService executor;
    private ScheduledFuture<?> revalidateTask;
    private boolean revalidateAfterReconnection = false;
    private long version = -1;
    private final CompletableFuture<Void> expiredFuture = new CompletableFuture<>();
    private final FutureUtil.Sequencer<Void> sequencer = FutureUtil.Sequencer.create();
    private State state = State.Init;
    private final Backoff backoff = new BackoffBuilder().setInitialTime(100, TimeUnit.MILLISECONDS).setMax(60, TimeUnit.SECONDS).create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-4.0.2.jar:org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl$State.class */
    public enum State {
        Init,
        Valid,
        Releasing,
        Released
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ResourceLockImpl(MetadataStoreExtended metadataStoreExtended, MetadataSerde<T> metadataSerde, String str, ScheduledExecutorService scheduledExecutorService) {
        this.store = metadataStoreExtended;
        this.serde = metadataSerde;
        this.path = str;
        this.executor = scheduledExecutorService;
    }

    @Override // org.apache.pulsar.metadata.api.coordination.ResourceLock
    public synchronized T getValue() {
        return this.value;
    }

    @Override // org.apache.pulsar.metadata.api.coordination.ResourceLock
    public synchronized CompletableFuture<Void> updateValue(T t) {
        return this.sequencer.sequential(() -> {
            synchronized (this) {
                if (this.state != State.Valid) {
                    return CompletableFuture.failedFuture(new IllegalStateException("Lock was not in valid state: " + String.valueOf(this.state)));
                }
                return acquire(t);
            }
        });
    }

    @Override // org.apache.pulsar.metadata.api.coordination.ResourceLock
    public synchronized CompletableFuture<Void> release() {
        if (this.state == State.Released) {
            return CompletableFuture.completedFuture(null);
        }
        this.state = State.Releasing;
        if (this.revalidateTask != null) {
            this.revalidateTask.cancel(true);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.store.delete(this.path, Optional.of(Long.valueOf(this.version))).thenRun(() -> {
            synchronized (this) {
                this.state = State.Released;
            }
            this.expiredFuture.complete(null);
            completableFuture.complete(null);
        }).exceptionally(th -> {
            if (!(th.getCause() instanceof MetadataStoreException.NotFoundException)) {
                completableFuture.completeExceptionally(th);
                return null;
            }
            synchronized (this) {
                this.state = State.Released;
            }
            this.expiredFuture.complete(null);
            completableFuture.complete(null);
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.metadata.api.coordination.ResourceLock
    public CompletableFuture<Void> getLockExpiredFuture() {
        return this.expiredFuture;
    }

    @Override // org.apache.pulsar.metadata.api.coordination.ResourceLock
    public String getPath() {
        return this.path;
    }

    public int hashCode() {
        return this.path.hashCode();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized CompletableFuture<Void> acquire(T t) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        acquireWithNoRevalidation(t).thenRun(() -> {
            completableFuture.complete(null);
        }).exceptionally(th -> {
            if (th.getCause() instanceof MetadataStoreException.LockBusyException) {
                revalidate(t).thenAccept(r4 -> {
                    completableFuture.complete(null);
                }).exceptionally(th -> {
                    completableFuture.completeExceptionally(th);
                    return null;
                });
                return null;
            }
            completableFuture.completeExceptionally(th.getCause());
            return null;
        });
        return completableFuture;
    }

    private CompletableFuture<Void> acquireWithNoRevalidation(T t) {
        if (log.isDebugEnabled()) {
            log.debug("acquireWithNoRevalidation,newValue={},version={}", t, Long.valueOf(this.version));
        }
        try {
            byte[] serialize = this.serde.serialize(this.path, t);
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            this.store.put(this.path, serialize, Optional.of(Long.valueOf(this.version)), EnumSet.of(CreateOption.Ephemeral)).thenAccept(stat -> {
                synchronized (this) {
                    this.state = State.Valid;
                    this.version = stat.getVersion();
                    this.value = t;
                }
                log.info("Acquired resource lock on {}", this.path);
                completableFuture.complete(null);
            }).exceptionally(th -> {
                if (th.getCause() instanceof MetadataStoreException.BadVersionException) {
                    completableFuture.completeExceptionally(new MetadataStoreException.LockBusyException("Resource at " + this.path + " is already locked"));
                    return null;
                }
                completableFuture.completeExceptionally(th.getCause());
                return null;
            });
            return completableFuture;
        } catch (Throwable th2) {
            return FutureUtils.exception(th2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void lockWasInvalidated() {
        log.info("Lock on resource {} was invalidated. state {}", this.path, this.state);
        silentRevalidateOnce();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
        if (!this.revalidateAfterReconnection) {
            return CompletableFuture.completedFuture(null);
        }
        this.revalidateAfterReconnection = false;
        log.warn("Revalidate lock at {} after reconnection", this.path);
        return silentRevalidateOnce();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized CompletableFuture<Void> silentRevalidateOnce() {
        return this.state != State.Valid ? CompletableFuture.completedFuture(null) : this.sequencer.sequential(() -> {
            return revalidate(this.value);
        }).thenRun(() -> {
            log.info("Successfully revalidated the lock on {}", this.path);
            this.backoff.reset();
        }).exceptionally(th -> {
            synchronized (this) {
                Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
                if ((unwrapCompletionException instanceof MetadataStoreException.BadVersionException) || (unwrapCompletionException instanceof MetadataStoreException.LockBusyException)) {
                    log.warn("Failed to revalidate the lock at {}. Marked as expired. {}", this.path, unwrapCompletionException.getMessage());
                    this.state = State.Released;
                    this.expiredFuture.complete(null);
                } else {
                    this.revalidateAfterReconnection = true;
                    long next = this.backoff.next();
                    log.warn("Failed to revalidate the lock at {}: {} - Retrying in {} seconds", this.path, unwrapCompletionException.getMessage(), Double.valueOf(next / 1000.0d));
                    this.revalidateTask = this.executor.schedule(this::silentRevalidateOnce, next, TimeUnit.MILLISECONDS);
                }
            }
            return null;
        });
    }

    private synchronized CompletableFuture<Void> revalidate(T t) {
        if (this.state != State.Valid && this.state != State.Init) {
            return CompletableFuture.failedFuture(new IllegalStateException("Lock was not in valid state: " + String.valueOf(this.state)));
        }
        if (log.isDebugEnabled()) {
            log.debug("doRevalidate with newValue={}, version={}", t, Long.valueOf(this.version));
        }
        return this.store.get(this.path).thenCompose(optional -> {
            if (!optional.isPresent()) {
                setVersion(-1L);
                return acquireWithNoRevalidation(t).thenRun(() -> {
                    log.info("Successfully re-acquired missing lock at {}", this.path);
                });
            }
            GetResult getResult = (GetResult) optional.get();
            if (!getResult.getStat().isEphemeral()) {
                return FutureUtils.exception(new MetadataStoreException.LockBusyException("Path " + this.path + " is already created as non-ephemeral"));
            }
            try {
                T deserialize = this.serde.deserialize(this.path, getResult.getValue(), getResult.getStat());
                synchronized (this) {
                    if (!t.equals(deserialize)) {
                        if (getResult.getStat().isCreatedBySelf()) {
                            return this.store.delete(this.path, Optional.of(Long.valueOf(getResult.getStat().getVersion()))).thenRun(() -> {
                                setVersion(-1L);
                            }).thenCompose(r5 -> {
                                return acquireWithNoRevalidation(t);
                            }).thenRun(() -> {
                                log.info("Successfully re-acquired lock at {}", this.path);
                            });
                        }
                        return FutureUtils.exception(new MetadataStoreException.LockBusyException("Resource at " + this.path + " is already locked"));
                    }
                    if (!getResult.getStat().isCreatedBySelf()) {
                        log.info("Deleting stale lock at {}", this.path);
                        return this.store.delete(this.path, Optional.of(Long.valueOf(getResult.getStat().getVersion()))).thenRun(() -> {
                            setVersion(-1L);
                        }).thenCompose(r52 -> {
                            return acquireWithNoRevalidation(t);
                        }).thenRun(() -> {
                            log.info("Successfully re-acquired stale lock at {}", this.path);
                        });
                    }
                    this.version = getResult.getStat().getVersion();
                    this.value = t;
                    return CompletableFuture.completedFuture(null);
                }
            } catch (Throwable th) {
                return FutureUtils.exception(th);
            }
        });
    }

    private synchronized void setVersion(long j) {
        this.version = j;
    }
}
