package org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.functions.runtime.shaded.io.kubernetes.client.util.authenticators.OpenIDConnectAuthenticator;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.DLMTestUtil;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.ZooKeeperClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.ZooKeeperClientUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.ZooKeeperClusterTestCase;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.LockingException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.ZKSessionLock;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.FailpointUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.Utils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.functions.runtime.shaded.org.junit.After;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Assert;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Before;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Rule;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Test;
import org.apache.pulsar.functions.runtime.shaded.org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/distributedlog/lock/TestZKSessionLock.class */
public class TestZKSessionLock extends ZooKeeperClusterTestCase {

    @Rule
    public TestName testNames = new TestName();
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) TestZKSessionLock.class);
    private static final int sessionTimeoutMs = 2000;
    private ZooKeeperClient zkc;
    private ZooKeeperClient zkc0;
    private OrderedScheduler lockStateExecutor;

    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/distributedlog/lock/TestZKSessionLock$DelayFailpointAction.class */
    class DelayFailpointAction extends FailpointUtils.AbstractFailPointAction {
        long timeout;

        DelayFailpointAction(long j) {
            this.timeout = j;
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.FailpointUtils.FailPointAction
        public boolean checkFailPoint() throws IOException {
            try {
                Thread.sleep(this.timeout);
                return true;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return true;
            }
        }
    }

    @Before
    public void setup() throws Exception {
        this.zkc = ZooKeeperClientBuilder.newBuilder().name("zkc").uri(DLMTestUtil.createDLMURI(zkPort, "/")).sessionTimeoutMs(2000).zkServers(zkServers).zkAclId(null).build();
        this.zkc0 = ZooKeeperClientBuilder.newBuilder().name("zkc0").uri(DLMTestUtil.createDLMURI(zkPort, "/")).sessionTimeoutMs(2000).zkServers(zkServers).zkAclId(null).build();
        this.lockStateExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(1).build();
    }

    @After
    public void teardown() throws Exception {
        this.zkc.close();
        this.zkc0.close();
        this.lockStateExecutor.shutdown();
    }

    private static void createLockPath(ZooKeeper zooKeeper, String str) throws Exception {
        zooKeeper.create(str, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    private static String createLockNodeV1(ZooKeeper zooKeeper, String str, String str2) throws Exception {
        return zooKeeper.create(ZKSessionLock.getLockPathPrefixV1(str), ZKSessionLock.serializeClientId(str2), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    private static String createLockNodeV2(ZooKeeper zooKeeper, String str, String str2) throws Exception {
        return zooKeeper.create(ZKSessionLock.getLockPathPrefixV2(str, str2), ZKSessionLock.serializeClientId(str2), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    private static String createLockNodeV3(ZooKeeper zooKeeper, String str, String str2) throws Exception {
        return zooKeeper.create(ZKSessionLock.getLockPathPrefixV3(str, str2, zooKeeper.getSessionId()), ZKSessionLock.serializeClientId(str2), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    private static String createLockNodeWithBadNodeName(ZooKeeper zooKeeper, String str, String str2, String str3) throws Exception {
        return zooKeeper.create(str + "/" + str3, ZKSessionLock.serializeClientId(str2), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    }

    private static List<String> getLockWaiters(ZooKeeperClient zooKeeperClient, String str) throws Exception {
        List<String> children = zooKeeperClient.get().getChildren(str, false);
        Collections.sort(children, ZKSessionLock.MEMBER_COMPARATOR);
        return children;
    }

    @Test(timeout = 60000)
    public void testParseClientID() throws Exception {
        ZooKeeper zooKeeper = this.zkc.get();
        String str = "test-parse-clientid-" + System.currentTimeMillis();
        Pair of = Pair.of(str, Long.valueOf(zooKeeper.getSessionId()));
        createLockPath(zooKeeper, "/test-parse-clientid");
        String lockIdFromPath = ZKSessionLock.getLockIdFromPath(createLockNodeV1(zooKeeper, "/test-parse-clientid", str));
        String lockIdFromPath2 = ZKSessionLock.getLockIdFromPath(createLockNodeV2(zooKeeper, "/test-parse-clientid", str));
        String lockIdFromPath3 = ZKSessionLock.getLockIdFromPath(createLockNodeV3(zooKeeper, "/test-parse-clientid", str));
        Assert.assertEquals(of, Utils.ioResult(ZKSessionLock.asyncParseClientID(zooKeeper, "/test-parse-clientid", lockIdFromPath)));
        Assert.assertEquals(of, Utils.ioResult(ZKSessionLock.asyncParseClientID(zooKeeper, "/test-parse-clientid", lockIdFromPath2)));
        Assert.assertEquals(of, Utils.ioResult(ZKSessionLock.asyncParseClientID(zooKeeper, "/test-parse-clientid", lockIdFromPath3)));
        String lockIdFromPath4 = ZKSessionLock.getLockIdFromPath(createLockNodeWithBadNodeName(zooKeeper, "/test-parse-clientid", str, "member"));
        String lockIdFromPath5 = ZKSessionLock.getLockIdFromPath(createLockNodeWithBadNodeName(zooKeeper, "/test-parse-clientid", str, "member_badnode"));
        String lockIdFromPath6 = ZKSessionLock.getLockIdFromPath(createLockNodeWithBadNodeName(zooKeeper, "/test-parse-clientid", str, "member_badnode_badnode"));
        String lockIdFromPath7 = ZKSessionLock.getLockIdFromPath(createLockNodeWithBadNodeName(zooKeeper, "/test-parse-clientid", str, "member_badnode_badnode_badnode"));
        String lockIdFromPath8 = ZKSessionLock.getLockIdFromPath(createLockNodeWithBadNodeName(zooKeeper, "/test-parse-clientid", str, "member_badnode_badnode_badnode_badnode"));
        Assert.assertEquals(of, Utils.ioResult(ZKSessionLock.asyncParseClientID(zooKeeper, "/test-parse-clientid", lockIdFromPath4)));
        Assert.assertEquals(of, Utils.ioResult(ZKSessionLock.asyncParseClientID(zooKeeper, "/test-parse-clientid", lockIdFromPath5)));
        Assert.assertEquals(of, Utils.ioResult(ZKSessionLock.asyncParseClientID(zooKeeper, "/test-parse-clientid", lockIdFromPath6)));
        Assert.assertEquals(of, Utils.ioResult(ZKSessionLock.asyncParseClientID(zooKeeper, "/test-parse-clientid", lockIdFromPath7)));
        Assert.assertEquals(of, Utils.ioResult(ZKSessionLock.asyncParseClientID(zooKeeper, "/test-parse-clientid", lockIdFromPath8)));
        Assert.assertEquals(Pair.of("malformed", 12345678L), Utils.ioResult(ZKSessionLock.asyncParseClientID(zooKeeper, "/test-parse-clientid", ZKSessionLock.getLockIdFromPath(createLockNodeWithBadNodeName(zooKeeper, "/test-parse-clientid", str, "member_malformed_s12345678_999999")))));
    }

    @Test(timeout = 60000)
    public void testParseMemberID() throws Exception {
        Assert.assertEquals(2147483647L, ZKSessionLock.parseMemberID("badnode"));
        Assert.assertEquals(2147483647L, ZKSessionLock.parseMemberID("badnode_badnode"));
        Assert.assertEquals(0L, ZKSessionLock.parseMemberID("member_000000"));
        Assert.assertEquals(123L, ZKSessionLock.parseMemberID("member_000123"));
    }

    @Test(timeout = 60000)
    public void testAreLockWaitersInSameSession() throws Exception {
        ZooKeeper zooKeeper = this.zkc.get();
        createLockPath(zooKeeper, "/test-are-lock-waiters-in-same-session");
        String lockIdFromPath = ZKSessionLock.getLockIdFromPath(createLockNodeV3(zooKeeper, "/test-are-lock-waiters-in-same-session", "test-are-lock-waiters-in-same-session-1"));
        String lockIdFromPath2 = ZKSessionLock.getLockIdFromPath(createLockNodeV3(zooKeeper, "/test-are-lock-waiters-in-same-session", "test-are-lock-waiters-in-same-session-2"));
        String lockIdFromPath3 = ZKSessionLock.getLockIdFromPath(createLockNodeV3(zooKeeper, "/test-are-lock-waiters-in-same-session", "test-are-lock-waiters-in-same-session-1"));
        Assert.assertEquals(lockIdFromPath + " and " + lockIdFromPath3 + " should be in same session.", (Object) true, (Object) Boolean.valueOf(ZKSessionLock.areLockWaitersInSameSession(lockIdFromPath, lockIdFromPath3)));
        Assert.assertEquals(lockIdFromPath + " and " + lockIdFromPath2 + " should be not in same session.", (Object) false, (Object) Boolean.valueOf(ZKSessionLock.areLockWaitersInSameSession(lockIdFromPath, lockIdFromPath2)));
        Assert.assertEquals(lockIdFromPath3 + " and " + lockIdFromPath2 + " should be not in same session.", (Object) false, (Object) Boolean.valueOf(ZKSessionLock.areLockWaitersInSameSession(lockIdFromPath3, lockIdFromPath2)));
    }

    @Test(timeout = 60000)
    public void testExecuteLockAction() throws Exception {
        ZKSessionLock zKSessionLock = new ZKSessionLock(this.zkc, "/test-execute-lock-action", "test-execute-lock-action-" + System.currentTimeMillis(), this.lockStateExecutor);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        zKSessionLock.executeLockAction(zKSessionLock.getEpoch(), new LockAction() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.TestZKSessionLock.1
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.LockAction
            public void execute() {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
            }

            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.LockAction
            public String getActionName() {
                return "increment1";
            }
        });
        countDownLatch.await();
        Assert.assertEquals("counter should be increased in same epoch", 1L, atomicInteger.get());
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        zKSessionLock.executeLockAction(zKSessionLock.getEpoch() + 1, new LockAction() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.TestZKSessionLock.2
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.LockAction
            public void execute() {
                atomicInteger.incrementAndGet();
            }

            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.LockAction
            public String getActionName() {
                return "increment2";
            }
        });
        zKSessionLock.executeLockAction(zKSessionLock.getEpoch(), new LockAction() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.TestZKSessionLock.3
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.LockAction
            public void execute() {
                countDownLatch2.countDown();
            }

            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.LockAction
            public String getActionName() {
                return "countdown";
            }
        });
        countDownLatch2.await();
        Assert.assertEquals("counter should not be increased in different epochs", 1L, atomicInteger.get());
        CompletableFuture completableFuture = new CompletableFuture();
        zKSessionLock.executeLockAction(zKSessionLock.getEpoch() + 1, new LockAction() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.TestZKSessionLock.4
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.LockAction
            public void execute() {
                atomicInteger.incrementAndGet();
            }

            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.LockAction
            public String getActionName() {
                return "increment3";
            }
        }, completableFuture);
        try {
            Utils.ioResult(completableFuture);
            Assert.fail("Should satisfy promise with epoch changed exception.");
        } catch (EpochChangedException e) {
        }
        Assert.assertEquals("counter should not be increased in different epochs", 1L, atomicInteger.get());
        this.lockStateExecutor.shutdown();
    }

    @Test(timeout = 60000)
    public void testLockAfterUnlock() throws Exception {
        ZKSessionLock zKSessionLock = new ZKSessionLock(this.zkc, "/test-lock-after-unlock", "test-lock-after-unlock", this.lockStateExecutor);
        zKSessionLock.unlock();
        Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock.getLockState());
        try {
            zKSessionLock.tryLock(0L, TimeUnit.MILLISECONDS);
            Assert.fail("Should fail on tryLock since lock state has changed.");
        } catch (LockStateChangedException e) {
        }
        Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock.getLockState());
        try {
            zKSessionLock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
            Assert.fail("Should fail on tryLock immediately if lock state has changed.");
        } catch (LockStateChangedException e2) {
        }
        Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock.getLockState());
    }

    @Test(timeout = 60000)
    public void testUnlockTimeout() throws Exception {
        String methodName = this.testNames.getMethodName();
        String str = "/" + methodName;
        createLockPath(this.zkc.get(), str);
        ZKSessionLock zKSessionLock = new ZKSessionLock(this.zkc, str, methodName, this.lockStateExecutor, 1000L, NullStatsLogger.INSTANCE, new DistributedLockContext());
        zKSessionLock.tryLock(0L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
        try {
            FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_LockUnlockCleanup, new DelayFailpointAction(3600000L));
            zKSessionLock.unlock();
            Assert.assertEquals(ZKSessionLock.State.CLOSING, zKSessionLock.getLockState());
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_LockUnlockCleanup);
        } catch (Throwable th) {
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_LockUnlockCleanup);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testTryCloseRaceCondition() throws Exception {
        String methodName = this.testNames.getMethodName();
        String str = "/" + methodName;
        createLockPath(this.zkc.get(), str);
        ZKSessionLock zKSessionLock = new ZKSessionLock(this.zkc, str, methodName, this.lockStateExecutor, 1000L, NullStatsLogger.INSTANCE, new DistributedLockContext());
        try {
            try {
                FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_LockTryCloseRaceCondition, FailpointUtils.DEFAULT_ACTION);
                zKSessionLock.tryLock(0L, TimeUnit.MILLISECONDS);
                FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_LockTryCloseRaceCondition);
            } catch (LockClosedException e) {
                FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_LockTryCloseRaceCondition);
            }
            Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock.getLockState());
            Assert.assertEquals(0L, getLockWaiters(this.zkc, str).size());
        } catch (Throwable th) {
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_LockTryCloseRaceCondition);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testTryAcquireTimeout() throws Exception {
        String methodName = this.testNames.getMethodName();
        String str = "/" + methodName;
        createLockPath(this.zkc.get(), str);
        ZKSessionLock zKSessionLock = new ZKSessionLock(this.zkc, str, methodName, this.lockStateExecutor, 1L, NullStatsLogger.INSTANCE, new DistributedLockContext());
        try {
            try {
                FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_LockTryAcquire, new DelayFailpointAction(3600000L));
                zKSessionLock.tryLock(0L, TimeUnit.MILLISECONDS);
                Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock.getLockState());
                FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_LockTryAcquire);
            } catch (LockingException e) {
                FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_LockTryAcquire);
            } catch (Exception e2) {
                Assert.fail("expected locking exception");
                FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_LockTryAcquire);
            }
        } catch (Throwable th) {
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_LockTryAcquire);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testBasicLockUnlock0() throws Exception {
        testBasicLockUnlock(0L);
    }

    @Test(timeout = 60000)
    public void testBasicLockUnlock1() throws Exception {
        testBasicLockUnlock(Long.MAX_VALUE);
    }

    private void testBasicLockUnlock(long j) throws Exception {
        String str = "/test-basic-lock-unlock-" + j + System.currentTimeMillis();
        createLockPath(this.zkc.get(), str);
        ZKSessionLock zKSessionLock = new ZKSessionLock(this.zkc, str, "test-basic-lock-unlock", this.lockStateExecutor);
        zKSessionLock.tryLock(j, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
        List<String> lockWaiters = getLockWaiters(this.zkc, str);
        Assert.assertEquals(1L, lockWaiters.size());
        Assert.assertEquals(zKSessionLock.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), str, lockWaiters.get(0))));
        try {
            zKSessionLock.tryLock(j, TimeUnit.MILLISECONDS);
            Assert.fail("Should fail on locking a failure lock.");
        } catch (LockStateChangedException e) {
        }
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
        List<String> lockWaiters2 = getLockWaiters(this.zkc, str);
        Assert.assertEquals(1L, lockWaiters2.size());
        Assert.assertEquals(zKSessionLock.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), str, lockWaiters2.get(0))));
        zKSessionLock.unlock();
        Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock.getLockState());
        Assert.assertEquals(0L, getLockWaiters(this.zkc, str).size());
    }

    @Test(timeout = 60000)
    public void testLockOnNonExistedLock() throws Exception {
        ZKSessionLock zKSessionLock = new ZKSessionLock(this.zkc, "/test-lock-on-non-existed-lock", "test-lock-on-non-existed-lock", this.lockStateExecutor);
        try {
            zKSessionLock.tryLock(0L, TimeUnit.MILLISECONDS);
            Assert.fail("Should fail on locking a non-existed lock.");
        } catch (LockingException e) {
            Throwable cause = e.getCause();
            Assert.assertTrue(cause instanceof KeeperException);
            Assert.assertEquals(KeeperException.Code.NONODE, ((KeeperException) cause).code());
        }
        Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock.getLockState());
        try {
            zKSessionLock.tryLock(0L, TimeUnit.MILLISECONDS);
            Assert.fail("Should fail on locking a failure lock.");
        } catch (LockStateChangedException e2) {
        }
        Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock.getLockState());
    }

    @Test(timeout = 60000)
    public void testLockWhenSomeoneHeldLock0() throws Exception {
        testLockWhenSomeoneHeldLock(0L);
    }

    @Test(timeout = 60000)
    public void testLockWhenSomeoneHeldLock1() throws Exception {
        testLockWhenSomeoneHeldLock(500L);
    }

    private void testLockWhenSomeoneHeldLock(long j) throws Exception {
        String str = "/test-lock-nowait-" + j + "-" + System.currentTimeMillis();
        String str2 = "test-lock-nowait-0-" + System.currentTimeMillis();
        String str3 = "test-lock-nowait-1-" + System.currentTimeMillis();
        String str4 = "test-lock-nowait-2-" + System.currentTimeMillis();
        createLockPath(this.zkc.get(), str);
        ZKSessionLock zKSessionLock = new ZKSessionLock(this.zkc0, str, str2, this.lockStateExecutor);
        ZKSessionLock zKSessionLock2 = new ZKSessionLock(this.zkc, str, str3, this.lockStateExecutor);
        zKSessionLock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
        List<String> lockWaiters = getLockWaiters(this.zkc0, str);
        Assert.assertEquals(1L, lockWaiters.size());
        Assert.assertEquals(zKSessionLock.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters.get(0))));
        try {
            zKSessionLock2.tryLock(j, TimeUnit.MILLISECONDS);
            Assert.fail("lock1 should fail on locking since lock0 is holding the lock.");
        } catch (OwnershipAcquireFailedException e) {
            Assert.assertEquals(zKSessionLock.getLockId().getLeft(), e.getCurrentOwner());
        }
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
        Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock2.getLockState());
        List<String> lockWaiters2 = getLockWaiters(this.zkc0, str);
        Assert.assertEquals(1L, lockWaiters2.size());
        Assert.assertEquals(zKSessionLock.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters2.get(0))));
        zKSessionLock.unlock();
        Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock.getLockState());
        Assert.assertEquals(0L, getLockWaiters(this.zkc, str).size());
        ZKSessionLock zKSessionLock3 = new ZKSessionLock(this.zkc, str, str4, this.lockStateExecutor);
        zKSessionLock3.tryLock(j, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock.getLockState());
        Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock2.getLockState());
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock3.getLockState());
        List<String> lockWaiters3 = getLockWaiters(this.zkc, str);
        Assert.assertEquals(1L, lockWaiters3.size());
        Assert.assertEquals(zKSessionLock3.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), str, lockWaiters3.get(0))));
        zKSessionLock3.unlock();
    }

    @Test(timeout = 60000)
    public void testLockWhenPreviousLockZnodeStillExists() throws Exception {
        String str = "/test-lock-when-previous-lock-znode-still-exists-" + System.currentTimeMillis();
        createLockPath(this.zkc.get(), str);
        ZKSessionLock zKSessionLock = new ZKSessionLock(this.zkc0, str, OpenIDConnectAuthenticator.OIDC_CLIENT_ID, this.lockStateExecutor);
        zKSessionLock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        DistributedLockContext distributedLockContext = new DistributedLockContext();
        distributedLockContext.addLockId(zKSessionLock.getLockId());
        ZKSessionLock zKSessionLock2 = new ZKSessionLock(this.zkc, str, OpenIDConnectAuthenticator.OIDC_CLIENT_ID, this.lockStateExecutor, 60000L, NullStatsLogger.INSTANCE, distributedLockContext);
        zKSessionLock2.tryLock(0L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock2.getLockState());
        zKSessionLock2.unlock();
        DistributedLockContext distributedLockContext2 = new DistributedLockContext();
        distributedLockContext2.addLockId(zKSessionLock.getLockId());
        ZKSessionLock zKSessionLock3 = new ZKSessionLock(this.zkc, str, OpenIDConnectAuthenticator.OIDC_CLIENT_ID, this.lockStateExecutor, 60000L, NullStatsLogger.INSTANCE, distributedLockContext2);
        zKSessionLock3.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock3.getLockState());
        zKSessionLock3.unlock();
        zKSessionLock.unlock();
    }

    @Test(timeout = 60000)
    public void testWaitForLockUnlock() throws Exception {
        testWaitForLockReleased("/test-wait-for-lock-unlock", true);
    }

    @Test(timeout = 60000)
    public void testWaitForLockExpired() throws Exception {
        testWaitForLockReleased("/test-wait-for-lock-expired", false);
    }

    private void testWaitForLockReleased(String str, boolean z) throws Exception {
        String str2 = "test-wait-for-lock-released-0-" + System.currentTimeMillis();
        String str3 = "test-wait-for-lock-released-1-" + System.currentTimeMillis();
        createLockPath(this.zkc.get(), str);
        ZKSessionLock zKSessionLock = new ZKSessionLock(this.zkc0, str, str2, this.lockStateExecutor);
        final ZKSessionLock zKSessionLock2 = new ZKSessionLock(this.zkc, str, str3, this.lockStateExecutor);
        zKSessionLock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
        List<String> lockWaiters = getLockWaiters(this.zkc0, str);
        Assert.assertEquals(1L, lockWaiters.size());
        Assert.assertEquals(zKSessionLock.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters.get(0))));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.TestZKSessionLock.5
            @Override // java.lang.Runnable
            public void run() {
                try {
                    zKSessionLock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                    countDownLatch.countDown();
                } catch (LockingException e) {
                    TestZKSessionLock.logger.error("Failed on locking lock1 : ", (Throwable) e);
                }
            }
        }, "lock1-thread");
        thread.start();
        awaitWaiters(2, this.zkc, str);
        if (z) {
            zKSessionLock.unlock();
        } else {
            ZooKeeperClientUtils.expireSession(this.zkc0, zkServers, 2000);
        }
        countDownLatch.await();
        thread.join();
        if (z) {
            Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock.getLockState());
        } else {
            Assert.assertEquals(ZKSessionLock.State.EXPIRED, zKSessionLock.getLockState());
        }
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock2.getLockState());
        List<String> lockWaiters2 = getLockWaiters(this.zkc, str);
        Assert.assertEquals(1L, lockWaiters2.size());
        Assert.assertEquals(zKSessionLock2.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), str, lockWaiters2.get(0))));
        zKSessionLock2.unlock();
    }

    @Test(timeout = 60000)
    public void testLockListenerOnExpired() throws Exception {
        String str = "test-lock-listener-on-expired-" + System.currentTimeMillis();
        createLockPath(this.zkc.get(), "/test-lock-listener-on-expired");
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ZKSessionLock lockListener = new ZKSessionLock(this.zkc, "/test-lock-listener-on-expired", str, this.lockStateExecutor).setLockListener(new LockListener() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.TestZKSessionLock.6
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.LockListener
            public void onExpired() {
                countDownLatch.countDown();
            }
        });
        lockListener.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, lockListener.getLockState());
        List<String> lockWaiters = getLockWaiters(this.zkc, "/test-lock-listener-on-expired");
        Assert.assertEquals(1L, lockWaiters.size());
        Assert.assertEquals(lockListener.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), "/test-lock-listener-on-expired", lockWaiters.get(0))));
        ZooKeeperClientUtils.expireSession(this.zkc, zkServers, 2000);
        countDownLatch.await();
        Assert.assertEquals(ZKSessionLock.State.EXPIRED, lockListener.getLockState());
        Assert.assertEquals(0L, getLockWaiters(this.zkc, "/test-lock-listener-on-expired").size());
        try {
            lockListener.tryLock(0L, TimeUnit.MILLISECONDS);
            Assert.fail("Should fail on tryLock since lock state has changed.");
        } catch (LockStateChangedException e) {
        }
        lockListener.unlock();
    }

    @Test(timeout = 60000)
    public void testSessionExpiredBeforeLock0() throws Exception {
        testSessionExpiredBeforeLock(0L);
    }

    @Test(timeout = 60000)
    public void testSessionExpiredBeforeLock1() throws Exception {
        testSessionExpiredBeforeLock(Long.MAX_VALUE);
    }

    private void testSessionExpiredBeforeLock(long j) throws Exception {
        String str = "/test-session-expired-before-lock-" + j + "-" + System.currentTimeMillis();
        String str2 = "test-session-expired-before-lock-" + System.currentTimeMillis();
        createLockPath(this.zkc.get(), str);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ZKSessionLock lockListener = new ZKSessionLock(this.zkc, str, str2, this.lockStateExecutor).setLockListener(new LockListener() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.TestZKSessionLock.7
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.LockListener
            public void onExpired() {
                atomicInteger.incrementAndGet();
            }
        });
        ZooKeeperClientUtils.expireSession(this.zkc, zkServers, 2000);
        this.lockStateExecutor.executeOrdered(str, () -> {
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertEquals(ZKSessionLock.State.INIT, lockListener.getLockState());
        try {
            lockListener.tryLock(j, TimeUnit.MILLISECONDS);
            Assert.fail("Should fail locking using an expired lock");
        } catch (LockingException e) {
            Assert.assertTrue(e.getCause() instanceof KeeperException.SessionExpiredException);
        }
        Assert.assertEquals(ZKSessionLock.State.CLOSED, lockListener.getLockState());
        Assert.assertEquals(0L, getLockWaiters(this.zkc, str).size());
    }

    @Test(timeout = 60000)
    public void testSessionExpiredForLockWaiter() throws Exception {
        createLockPath(this.zkc.get(), "/test-session-expired-for-lock-waiter");
        ZKSessionLock zKSessionLock = new ZKSessionLock(this.zkc0, "/test-session-expired-for-lock-waiter", "test-session-expired-for-lock-waiter-0", this.lockStateExecutor);
        zKSessionLock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
        List<String> lockWaiters = getLockWaiters(this.zkc0, "/test-session-expired-for-lock-waiter");
        Assert.assertEquals(1L, lockWaiters.size());
        Assert.assertEquals(zKSessionLock.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), "/test-session-expired-for-lock-waiter", lockWaiters.get(0))));
        final ZKSessionLock zKSessionLock2 = new ZKSessionLock(this.zkc, "/test-session-expired-for-lock-waiter", "test-session-expired-for-lock-waiter-1", this.lockStateExecutor);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.TestZKSessionLock.8
            @Override // java.lang.Runnable
            public void run() {
                try {
                    zKSessionLock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                } catch (OwnershipAcquireFailedException e) {
                    countDownLatch.countDown();
                } catch (LockingException e2) {
                    TestZKSessionLock.logger.error("Failed on locking lock1 : ", (Throwable) e2);
                }
            }
        }, "lock1-thread");
        thread.start();
        List<String> awaitWaiters = awaitWaiters(2, this.zkc, "/test-session-expired-for-lock-waiter");
        Assert.assertEquals(2L, awaitWaiters.size());
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
        Assert.assertEquals(zKSessionLock.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), "/test-session-expired-for-lock-waiter", awaitWaiters.get(0))));
        awaitState(ZKSessionLock.State.WAITING, zKSessionLock2);
        Assert.assertEquals(zKSessionLock2.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), "/test-session-expired-for-lock-waiter", awaitWaiters.get(1))));
        ZooKeeperClientUtils.expireSession(this.zkc, zkServers, 2000);
        countDownLatch.countDown();
        thread.join();
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
        Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock2.getLockState());
        List<String> lockWaiters2 = getLockWaiters(this.zkc0, "/test-session-expired-for-lock-waiter");
        Assert.assertEquals(1L, lockWaiters2.size());
        Assert.assertEquals(zKSessionLock.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), "/test-session-expired-for-lock-waiter", lockWaiters2.get(0))));
    }

    public void awaitState(ZKSessionLock.State state, ZKSessionLock zKSessionLock) throws InterruptedException {
        while (zKSessionLock.getLockState() != state) {
            Thread.sleep(50L);
        }
    }

    public List<String> awaitWaiters(int i, ZooKeeperClient zooKeeperClient, String str) throws Exception {
        List<String> lockWaiters = getLockWaiters(zooKeeperClient, str);
        while (true) {
            List<String> list = lockWaiters;
            if (list.size() >= i) {
                return list;
            }
            Thread.sleep(50L);
            lockWaiters = getLockWaiters(zooKeeperClient, str);
        }
    }

    @Test(timeout = 60000)
    public void testLockUseSameClientIdButDifferentSessions0() throws Exception {
        testLockUseSameClientIdButDifferentSessions(true);
    }

    @Test(timeout = 60000)
    public void testLockUseSameClientIdButDifferentSessions1() throws Exception {
        testLockUseSameClientIdButDifferentSessions(false);
    }

    private void testLockUseSameClientIdButDifferentSessions(boolean z) throws Exception {
        String str = "/test-lock-use-same-client-id-but-different-sessions-" + z + System.currentTimeMillis();
        createLockPath(this.zkc.get(), str);
        ZKSessionLock zKSessionLock = new ZKSessionLock(this.zkc0, str, "test-lock-use-same-client-id-but-different-sessions", this.lockStateExecutor);
        zKSessionLock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        ZKSessionLock zKSessionLock2 = new ZKSessionLock(this.zkc, str, "test-lock-use-same-client-id-but-different-sessions", this.lockStateExecutor);
        try {
            zKSessionLock2.tryLock(0L, TimeUnit.MILLISECONDS);
            Assert.fail("Should fail locking since the lock is held in a different zk session.");
        } catch (OwnershipAcquireFailedException e) {
            Assert.assertEquals("test-lock-use-same-client-id-but-different-sessions", e.getCurrentOwner());
        }
        Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock2.getLockState());
        List<String> lockWaiters = getLockWaiters(this.zkc0, str);
        Assert.assertEquals(1L, lockWaiters.size());
        Assert.assertEquals(zKSessionLock.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters.get(0))));
        final ZKSessionLock zKSessionLock3 = new ZKSessionLock(this.zkc, str, "test-lock-use-same-client-id-but-different-sessions", this.lockStateExecutor);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.TestZKSessionLock.9
            @Override // java.lang.Runnable
            public void run() {
                try {
                    zKSessionLock3.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                    countDownLatch.countDown();
                } catch (LockingException e2) {
                    TestZKSessionLock.logger.error("Failed on locking lock1 : ", (Throwable) e2);
                }
            }
        }, "lock1-thread");
        thread.start();
        List<String> awaitWaiters = awaitWaiters(2, this.zkc, str);
        logger.info("Found {} lock waiters : {}", Integer.valueOf(awaitWaiters.size()), awaitWaiters);
        Assert.assertEquals(2L, awaitWaiters.size());
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
        Assert.assertEquals(zKSessionLock.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, awaitWaiters.get(0))));
        awaitState(ZKSessionLock.State.WAITING, zKSessionLock3);
        Assert.assertEquals(zKSessionLock3.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), str, awaitWaiters.get(1))));
        if (z) {
            zKSessionLock.unlock();
        } else {
            ZooKeeperClientUtils.expireSession(this.zkc0, zkServers, 2000);
        }
        countDownLatch.await();
        thread.join();
        if (z) {
            Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock.getLockState());
        } else {
            Assert.assertEquals(ZKSessionLock.State.EXPIRED, zKSessionLock.getLockState());
        }
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock3.getLockState());
        List<String> lockWaiters2 = getLockWaiters(this.zkc, str);
        Assert.assertEquals(1L, lockWaiters2.size());
        Assert.assertEquals(zKSessionLock3.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), str, lockWaiters2.get(0))));
        zKSessionLock3.unlock();
    }

    @Test(timeout = 60000)
    public void testLockWithMultipleSiblingWaiters() throws Exception {
        createLockPath(this.zkc.get(), "/test-lock-with-multiple-sibling-waiters");
        ZKSessionLock zKSessionLock = new ZKSessionLock(this.zkc, "/test-lock-with-multiple-sibling-waiters", OpenIDConnectAuthenticator.OIDC_CLIENT_ID, this.lockStateExecutor);
        ZKSessionLock zKSessionLock2 = new ZKSessionLock(this.zkc, "/test-lock-with-multiple-sibling-waiters", OpenIDConnectAuthenticator.OIDC_CLIENT_ID, this.lockStateExecutor);
        ZKSessionLock zKSessionLock3 = new ZKSessionLock(this.zkc, "/test-lock-with-multiple-sibling-waiters", OpenIDConnectAuthenticator.OIDC_CLIENT_ID, this.lockStateExecutor);
        zKSessionLock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        zKSessionLock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        zKSessionLock3.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        Assert.assertEquals(3L, awaitWaiters(3, this.zkc, "/test-lock-with-multiple-sibling-waiters").size());
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock2.getLockState());
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock3.getLockState());
        zKSessionLock.unlock();
        zKSessionLock2.unlock();
        zKSessionLock3.unlock();
    }

    @Test(timeout = 60000)
    public void testLockWhenSiblingUseDifferentLockId0() throws Exception {
        testLockWhenSiblingUseDifferentLockId(0L, true);
    }

    @Test(timeout = 60000)
    public void testLockWhenSiblingUseDifferentLockId1() throws Exception {
        testLockWhenSiblingUseDifferentLockId(0L, false);
    }

    @Test(timeout = 60000)
    public void testLockWhenSiblingUseDifferentLockId2() throws Exception {
        testLockWhenSiblingUseDifferentLockId(Long.MAX_VALUE, true);
    }

    @Test(timeout = 60000)
    public void testLockWhenSiblingUseDifferentLockId3() throws Exception {
        testLockWhenSiblingUseDifferentLockId(Long.MAX_VALUE, false);
    }

    private void testLockWhenSiblingUseDifferentLockId(long j, final boolean z) throws Exception {
        String str = "/test-lock-when-sibling-use-different-lock-id-" + j + "-" + z + "-" + System.currentTimeMillis();
        createLockPath(this.zkc.get(), str);
        ZKSessionLock zKSessionLock = new ZKSessionLock(this.zkc0, str, "client-id-0", this.lockStateExecutor);
        final ZKSessionLock zKSessionLock2 = new ZKSessionLock(this.zkc0, str, "client-id-0", this.lockStateExecutor);
        final ZKSessionLock zKSessionLock3 = new ZKSessionLock(this.zkc, str, "client-id-1", this.lockStateExecutor);
        zKSessionLock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.TestZKSessionLock.10
            @Override // java.lang.Runnable
            public void run() {
                try {
                    zKSessionLock3.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                    countDownLatch.countDown();
                } catch (LockingException e) {
                    TestZKSessionLock.logger.error("Failed on locking lock1 : ", (Throwable) e);
                }
            }
        }, "lock1-thread");
        thread.start();
        List<String> awaitWaiters = awaitWaiters(2, this.zkc, str);
        Assert.assertEquals(2L, awaitWaiters.size());
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
        Assert.assertEquals(zKSessionLock.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, awaitWaiters.get(0))));
        awaitState(ZKSessionLock.State.WAITING, zKSessionLock3);
        Assert.assertEquals(zKSessionLock3.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), str, awaitWaiters.get(1))));
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final AtomicReference atomicReference = new AtomicReference(null);
        Thread thread2 = null;
        if (j == 0) {
            try {
                zKSessionLock2.tryLock(0L, TimeUnit.MILLISECONDS);
                Assert.fail("Should fail on locking if sibling is using differnt lock id.");
            } catch (OwnershipAcquireFailedException e) {
                Assert.assertEquals("client-id-0", e.getCurrentOwner());
            }
            Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock2.getLockState());
            List<String> lockWaiters = getLockWaiters(this.zkc, str);
            Assert.assertEquals(2L, lockWaiters.size());
            Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
            Assert.assertEquals(zKSessionLock.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters.get(0))));
            Assert.assertEquals(ZKSessionLock.State.WAITING, zKSessionLock3.getLockState());
            Assert.assertEquals(zKSessionLock3.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), str, lockWaiters.get(1))));
        } else {
            thread2 = new Thread(new Runnable() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.lock.TestZKSessionLock.11
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        zKSessionLock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                        if (z) {
                            countDownLatch2.countDown();
                        }
                    } catch (OwnershipAcquireFailedException e2) {
                        if (z) {
                            return;
                        }
                        atomicReference.set(e2.getCurrentOwner());
                        countDownLatch2.countDown();
                    } catch (LockingException e3) {
                        TestZKSessionLock.logger.error("Failed on locking lock0_1 : ", (Throwable) e3);
                    }
                }
            }, "lock0-thread");
            thread2.start();
            List<String> awaitWaiters2 = awaitWaiters(3, this.zkc, str);
            Assert.assertEquals(3L, awaitWaiters2.size());
            Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
            Assert.assertEquals(zKSessionLock.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, awaitWaiters2.get(0))));
            awaitState(ZKSessionLock.State.WAITING, zKSessionLock3);
            Assert.assertEquals(zKSessionLock3.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), str, awaitWaiters2.get(1))));
            awaitState(ZKSessionLock.State.WAITING, zKSessionLock2);
            Assert.assertEquals(zKSessionLock2.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, awaitWaiters2.get(2))));
        }
        if (z) {
            zKSessionLock.unlock();
        } else {
            ZooKeeperClientUtils.expireSession(this.zkc0, zkServers, 2000);
        }
        countDownLatch.await();
        thread.join();
        if (z) {
            Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock.getLockState());
        } else {
            Assert.assertEquals(ZKSessionLock.State.EXPIRED, zKSessionLock.getLockState());
        }
        if (j == 0) {
            List<String> lockWaiters2 = getLockWaiters(this.zkc, str);
            Assert.assertEquals(1L, lockWaiters2.size());
            Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock3.getLockState());
            Assert.assertEquals(zKSessionLock3.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), str, lockWaiters2.get(0))));
        } else {
            Assert.assertNotNull(thread2);
            if (z) {
                List<String> lockWaiters3 = getLockWaiters(this.zkc, str);
                Assert.assertEquals(2L, lockWaiters3.size());
                Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock3.getLockState());
                Assert.assertEquals(zKSessionLock3.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), str, lockWaiters3.get(0))));
                Assert.assertEquals(ZKSessionLock.State.WAITING, zKSessionLock2.getLockState());
                Assert.assertEquals(zKSessionLock2.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters3.get(1))));
            } else {
                countDownLatch2.await();
                thread2.join();
                Assert.assertEquals("client-id-0", atomicReference.get());
                Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock2.getLockState());
                List<String> lockWaiters4 = getLockWaiters(this.zkc, str);
                Assert.assertEquals(1L, lockWaiters4.size());
                Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock3.getLockState());
                Assert.assertEquals(zKSessionLock3.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), str, lockWaiters4.get(0))));
            }
        }
        zKSessionLock3.unlock();
        if (j == 0 || !z) {
            return;
        }
        countDownLatch2.await();
        thread2.join();
        List<String> lockWaiters5 = getLockWaiters(this.zkc, str);
        Assert.assertEquals(1L, lockWaiters5.size());
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock2.getLockState());
        Assert.assertEquals(zKSessionLock2.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters5.get(0))));
    }

    @Test(timeout = 60000)
    public void testLockWhenSiblingUseSameLockId0() throws Exception {
        testLockWhenSiblingUseSameLockId(0L, true);
    }

    @Test(timeout = 60000)
    public void testLockWhenSiblingUseSameLockId1() throws Exception {
        testLockWhenSiblingUseSameLockId(0L, false);
    }

    @Test(timeout = 60000)
    public void testLockWhenSiblingUseSameLockId2() throws Exception {
        testLockWhenSiblingUseSameLockId(Long.MAX_VALUE, true);
    }

    @Test(timeout = 60000)
    public void testLockWhenSiblingUseSameLockId3() throws Exception {
        testLockWhenSiblingUseSameLockId(Long.MAX_VALUE, false);
    }

    private void testLockWhenSiblingUseSameLockId(long j, boolean z) throws Exception {
        String str = "/test-lock-when-sibling-use-same-lock-id-" + j + "-" + z + "-" + System.currentTimeMillis();
        createLockPath(this.zkc.get(), str);
        ZKSessionLock zKSessionLock = new ZKSessionLock(this.zkc0, str, OpenIDConnectAuthenticator.OIDC_CLIENT_ID, this.lockStateExecutor);
        ZKSessionLock zKSessionLock2 = new ZKSessionLock(this.zkc0, str, OpenIDConnectAuthenticator.OIDC_CLIENT_ID, this.lockStateExecutor);
        zKSessionLock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        List<String> lockWaiters = getLockWaiters(this.zkc0, str);
        Assert.assertEquals(1L, lockWaiters.size());
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
        Assert.assertEquals(zKSessionLock.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters.get(0))));
        zKSessionLock2.tryLock(j, TimeUnit.MILLISECONDS);
        List<String> lockWaiters2 = getLockWaiters(this.zkc0, str);
        Assert.assertEquals(2L, lockWaiters2.size());
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock.getLockState());
        Assert.assertEquals(zKSessionLock.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters2.get(0))));
        Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock2.getLockState());
        Assert.assertEquals(zKSessionLock2.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters2.get(1))));
        if (z) {
            zKSessionLock.unlock();
            Assert.assertEquals(ZKSessionLock.State.CLOSED, zKSessionLock.getLockState());
            List<String> lockWaiters3 = getLockWaiters(this.zkc0, str);
            Assert.assertEquals(1L, lockWaiters3.size());
            Assert.assertEquals(ZKSessionLock.State.CLAIMED, zKSessionLock2.getLockState());
            Assert.assertEquals(zKSessionLock2.getLockId(), Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters3.get(0))));
            zKSessionLock2.unlock();
            return;
        }
        ZooKeeperClientUtils.expireSession(this.zkc0, zkServers, 2000);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.lockStateExecutor.executeOrdered(str, () -> {
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertEquals(0L, getLockWaiters(this.zkc, str).size());
        Assert.assertEquals(ZKSessionLock.State.EXPIRED, zKSessionLock.getLockState());
        Assert.assertEquals(ZKSessionLock.State.EXPIRED, zKSessionLock2.getLockState());
    }
}
