package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.cluster;

import java.time.Duration;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerController;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImpl.class */
public class ClusterControllerLeaderImpl implements ClusterControllerLeader, RegistrationClient.RegistrationListener {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ClusterControllerLeaderImpl.class);
    private final ClusterMetadataStore clusterMetadataStore;
    private final StorageContainerController scController;
    private final RegistrationClient regClient;
    private volatile Set<BookieSocketAddress> availableServers;
    private final Duration scheduleDuration;
    private final Object suspensionLock = new Object();
    private volatile boolean suspended = false;
    private final Semaphore performServerChangesPermits = new Semaphore(0);
    private long lastSuccessfulAssigmentAt = -1;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClusterControllerLeaderImpl(ClusterMetadataStore clusterMetadataStore, StorageContainerController storageContainerController, RegistrationClient registrationClient, Duration duration) {
        this.clusterMetadataStore = clusterMetadataStore;
        this.scController = storageContainerController;
        this.regClient = registrationClient;
        this.scheduleDuration = duration;
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader
    public void suspend() {
        synchronized (this.suspensionLock) {
            this.suspended = true;
            this.suspensionLock.notifyAll();
        }
    }

    boolean isSuspended() {
        return this.suspended;
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader
    public void resume() {
        synchronized (this.suspensionLock) {
            this.suspended = false;
            this.suspensionLock.notifyAll();
        }
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader
    public void processAsLeader() throws Exception {
        log.info("Become controller leader to monitor servers for assigning storage containers.");
        this.performServerChangesPermits.release();
        try {
            this.regClient.watchWritableBookies(this).get();
            while (true) {
                try {
                    checkSuspension();
                    processServerChange();
                } catch (InterruptedException e) {
                    log.warn("Controller leader is interrupted, giving up leadership");
                    this.regClient.unwatchWritableBookies(this);
                    throw e;
                } catch (Exception e2) {
                    if (!this.suspended) {
                        log.warn("Controller leader encountered exceptions on processing server changes, giving up leadership");
                        this.regClient.unwatchWritableBookies(this);
                        throw e2;
                    }
                }
            }
            log.warn("Controller leader encountered exceptions on processing server changes, giving up leadership");
            this.regClient.unwatchWritableBookies(this);
            throw e2;
        } catch (Exception e3) {
            log.warn("Controller leader fails to watch servers : {}, giving up leadership", e3.getMessage());
            throw e3;
        }
    }

    private void checkSuspension() throws InterruptedException {
        synchronized (this.suspensionLock) {
            while (this.suspended) {
                log.info("Controller leader is suspended, waiting for to be resumed");
                this.suspensionLock.wait();
                log.info("Controller leader is woke up from suspension");
            }
        }
    }

    private void processServerChange() throws InterruptedException {
        this.performServerChangesPermits.acquire();
        long millis = this.scheduleDuration.toMillis() - (System.currentTimeMillis() - this.lastSuccessfulAssigmentAt);
        if (millis > 0) {
            log.info("Waiting {} milliseconds for controller to assign containers", Long.valueOf(millis));
            TimeUnit.MILLISECONDS.sleep(millis);
        }
        this.performServerChangesPermits.drainPermits();
        Set<BookieSocketAddress> set = this.availableServers;
        if (null == set || set.isEmpty()) {
            if (this.lastSuccessfulAssigmentAt < 0) {
                log.info("No servers is alive yet. Backoff 200ms and retry.");
                TimeUnit.MILLISECONDS.sleep(200L);
                this.performServerChangesPermits.release();
                return;
            }
            return;
        }
        ClusterMetadata clusterMetadata = this.clusterMetadataStore.getClusterMetadata();
        ClusterAssignmentData clusterAssignmentData = this.clusterMetadataStore.getClusterAssignmentData();
        ClusterAssignmentData computeIdealState = this.scController.computeIdealState(clusterMetadata, clusterAssignmentData, set);
        if (!computeIdealState.equals(clusterAssignmentData)) {
            this.lastSuccessfulAssigmentAt = System.currentTimeMillis();
            this.clusterMetadataStore.updateClusterAssignmentData(computeIdealState);
        } else if (log.isDebugEnabled()) {
            log.debug("Assignment state is unchanged - {}", computeIdealState);
        }
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener
    public void onBookiesChanged(Versioned<Set<BookieSocketAddress>> versioned) {
        log.info("Cluster topology is changed - new cluster : {}", versioned);
        this.availableServers = versioned.getValue();
        this.performServerChangesPermits.release();
    }

    Semaphore getPerformServerChangesPermits() {
        return this.performServerChangesPermits;
    }

    Object getSuspensionLock() {
        return this.suspensionLock;
    }

    long getLastSuccessfulAssigmentAt() {
        return this.lastSuccessfulAssigmentAt;
    }
}
