package org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.util;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.CatalogProperties;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.LockManager;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.MetadataColumns;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.common.DynConstructors;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/shaded/net/snowflake/ingest/internal/apache/iceberg/util/LockManagers.class */
public class LockManagers {
    private static final LockManager LOCK_MANAGER_DEFAULT = new InMemoryLockManager(Maps.newHashMap());

    /* loaded from: input_file:org/apache/flink/shaded/net/snowflake/ingest/internal/apache/iceberg/util/LockManagers$BaseLockManager.class */
    public static abstract class BaseLockManager implements LockManager {
        private static volatile ScheduledExecutorService scheduler;
        private long acquireTimeoutMs;
        private long acquireIntervalMs;
        private long heartbeatIntervalMs;
        private long heartbeatTimeoutMs;
        private int heartbeatThreads;

        public long heartbeatTimeoutMs() {
            return this.heartbeatTimeoutMs;
        }

        public long heartbeatIntervalMs() {
            return this.heartbeatIntervalMs;
        }

        public long acquireIntervalMs() {
            return this.acquireIntervalMs;
        }

        public long acquireTimeoutMs() {
            return this.acquireTimeoutMs;
        }

        public int heartbeatThreads() {
            return this.heartbeatThreads;
        }

        public ScheduledExecutorService scheduler() {
            if (scheduler == null) {
                synchronized (BaseLockManager.class) {
                    if (scheduler == null) {
                        scheduler = MoreExecutors.getExitingScheduledExecutorService((ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(heartbeatThreads(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("iceberg-lock-manager-%d").build()));
                    }
                }
            }
            return scheduler;
        }

        @Override // org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.LockManager
        public void initialize(Map<String, String> map) {
            this.acquireTimeoutMs = PropertyUtil.propertyAsLong(map, CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS_DEFAULT);
            this.acquireIntervalMs = PropertyUtil.propertyAsLong(map, CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS_DEFAULT);
            this.heartbeatIntervalMs = PropertyUtil.propertyAsLong(map, CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS, CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
            this.heartbeatTimeoutMs = PropertyUtil.propertyAsLong(map, CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS, CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT);
            this.heartbeatThreads = PropertyUtil.propertyAsInt(map, CatalogProperties.LOCK_HEARTBEAT_THREADS, 4);
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            if (scheduler != null) {
                scheduler.shutdownNow().forEach(runnable -> {
                    if (runnable instanceof Future) {
                        ((Future) runnable).cancel(true);
                    }
                });
                scheduler = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/shaded/net/snowflake/ingest/internal/apache/iceberg/util/LockManagers$InMemoryLockContent.class */
    public static class InMemoryLockContent {
        private final String ownerId;
        private final long expireMs;

        InMemoryLockContent(String str, long j) {
            this.ownerId = str;
            this.expireMs = j;
        }

        public long expireMs() {
            return this.expireMs;
        }

        public String ownerId() {
            return this.ownerId;
        }
    }

    /* loaded from: input_file:org/apache/flink/shaded/net/snowflake/ingest/internal/apache/iceberg/util/LockManagers$InMemoryLockManager.class */
    static class InMemoryLockManager extends BaseLockManager {
        private static final Logger LOG = LoggerFactory.getLogger(InMemoryLockManager.class);
        private static final Map<String, InMemoryLockContent> LOCKS = Maps.newConcurrentMap();
        private static final Map<String, ScheduledFuture<?>> HEARTBEATS = Maps.newHashMap();

        InMemoryLockManager(Map<String, String> map) {
            initialize(map);
        }

        @VisibleForTesting
        void acquireOnce(String str, String str2) {
            boolean replace;
            InMemoryLockContent inMemoryLockContent = LOCKS.get(str);
            if (inMemoryLockContent != null && inMemoryLockContent.expireMs() > System.currentTimeMillis()) {
                throw new IllegalStateException(String.format("Lock for %s currently held by %s, expiration: %s", str, inMemoryLockContent.ownerId(), Long.valueOf(inMemoryLockContent.expireMs())));
            }
            long currentTimeMillis = System.currentTimeMillis() + heartbeatTimeoutMs();
            if (inMemoryLockContent == null) {
                replace = LOCKS.putIfAbsent(str, new InMemoryLockContent(str2, currentTimeMillis)) == null;
            } else {
                replace = LOCKS.replace(str, inMemoryLockContent, new InMemoryLockContent(str2, currentTimeMillis));
            }
            if (!replace) {
                throw new IllegalStateException("Unable to acquire lock " + str);
            }
            if (HEARTBEATS.containsKey(str)) {
                HEARTBEATS.remove(str).cancel(false);
            }
            HEARTBEATS.put(str, scheduler().scheduleAtFixedRate(() -> {
                try {
                    LOCKS.replace(str, LOCKS.get(str), new InMemoryLockContent(str2, System.currentTimeMillis() + heartbeatTimeoutMs()));
                } catch (NullPointerException e) {
                    throw new RuntimeException("Cannot heartbeat to a deleted lock " + str, e);
                }
            }, 0L, heartbeatIntervalMs(), TimeUnit.MILLISECONDS));
        }

        @Override // org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.LockManager
        public boolean acquire(String str, String str2) {
            try {
                Tasks.foreach(str).retry(MetadataColumns.FILE_PATH_COLUMN_ID).onlyRetryOn(IllegalStateException.class).throwFailureWhenFinished().exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1.0d).run(str3 -> {
                    acquireOnce(str3, str2);
                });
                return true;
            } catch (IllegalStateException e) {
                return false;
            }
        }

        @Override // org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.LockManager
        public boolean release(String str, String str2) {
            InMemoryLockContent inMemoryLockContent = LOCKS.get(str);
            if (inMemoryLockContent == null) {
                LOG.error("Cannot find lock for entity {}", str);
                return false;
            }
            if (!inMemoryLockContent.ownerId().equals(str2)) {
                LOG.error("Cannot unlock {} by {}, current owner: {}", new Object[]{str, str2, inMemoryLockContent.ownerId()});
                return false;
            }
            Optional.ofNullable(HEARTBEATS.remove(str)).ifPresent(scheduledFuture -> {
                scheduledFuture.cancel(false);
            });
            LOCKS.remove(str);
            return true;
        }

        @Override // org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.util.LockManagers.BaseLockManager, java.lang.AutoCloseable
        public void close() throws Exception {
            HEARTBEATS.values().forEach(scheduledFuture -> {
                scheduledFuture.cancel(false);
            });
            HEARTBEATS.clear();
            LOCKS.clear();
            super.close();
        }
    }

    private LockManagers() {
    }

    public static LockManager defaultLockManager() {
        return LOCK_MANAGER_DEFAULT;
    }

    public static LockManager from(Map<String, String> map) {
        return map.containsKey(CatalogProperties.LOCK_IMPL) ? loadLockManager(map.get(CatalogProperties.LOCK_IMPL), map) : defaultLockManager();
    }

    private static LockManager loadLockManager(String str, Map<String, String> map) {
        try {
            try {
                LockManager lockManager = (LockManager) DynConstructors.builder(LockManager.class).hiddenImpl(str, new Class[0]).buildChecked().newInstance(new Object[0]);
                lockManager.initialize(map);
                return lockManager;
            } catch (ClassCastException e) {
                throw new IllegalArgumentException(String.format("Cannot initialize LockManager, %s does not implement LockManager.", str), e);
            }
        } catch (NoSuchMethodException e2) {
            throw new IllegalArgumentException(String.format("Cannot initialize LockManager, missing no-arg constructor: %s", str), e2);
        }
    }
}
