package org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog;

import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Sets;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.DistributedLogManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.LogReader;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.LogWriter;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.callback.NamespaceListener;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.AlreadyClosedException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.LockingException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.ZKException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.DLUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.data.Stat;
import org.apache.pulsar.functions.runtime.shaded.org.junit.After;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Assert;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Before;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Rule;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Test;
import org.apache.pulsar.functions.runtime.shaded.org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/distributedlog/TestBKDistributedLogNamespace.class */
public class TestBKDistributedLogNamespace extends TestDistributedLogBase {

    @Rule
    public TestName runtime = new TestName();
    static final Logger LOG = LoggerFactory.getLogger((Class<?>) TestBKDistributedLogNamespace.class);
    protected static DistributedLogConfiguration conf = new DistributedLogConfiguration().setLockTimeout(10).setEnableLedgerAllocatorPool(true).setLedgerAllocatorPoolName("test");
    private ZooKeeperClient zooKeeperClient;

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestDistributedLogBase
    @Before
    public void setup() throws Exception {
        this.zooKeeperClient = TestZooKeeperClientBuilder.newBuilder().uri(createDLMURI("/")).build();
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestDistributedLogBase
    @After
    public void teardown() throws Exception {
        this.zooKeeperClient.close();
    }

    @Test(timeout = 60000)
    public void testCreateLogPath0() throws Exception {
        createLogPathTest("/create/log/path/" + this.runtime.getMethodName());
    }

    @Test(timeout = 60000)
    public void testCreateLogPath1() throws Exception {
        createLogPathTest("create/log/path/" + this.runtime.getMethodName());
    }

    private void createLogPathTest(String str) throws Exception {
        URI createDLMURI = createDLMURI("/" + this.runtime.getMethodName());
        ensureURICreated(this.zooKeeperClient.get(), createDLMURI);
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setCreateStreamIfNotExists(false);
        DistributedLogManager openLog = NamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(createDLMURI).build().openLog(str);
        try {
            LogWriter startLogSegmentNonPartitioned = openLog.startLogSegmentNonPartitioned();
            startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L));
            startLogSegmentNonPartitioned.commit();
            Assert.fail("Should fail to write data if stream doesn't exist.");
        } catch (IOException e) {
        }
        openLog.close();
    }

    @Test(timeout = 60000)
    public void testCreateIfNotExists() throws Exception {
        URI createDLMURI = createDLMURI("/" + this.runtime.getMethodName());
        ensureURICreated(this.zooKeeperClient.get(), createDLMURI);
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setCreateStreamIfNotExists(false);
        Namespace build = NamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(createDLMURI).build();
        DistributedLogManager openLog = build.openLog("test-stream");
        try {
            openLog.startLogSegmentNonPartitioned().write(DLMTestUtil.getLogRecordInstance(1L));
            Assert.fail("Should fail to write data if stream doesn't exist.");
        } catch (IOException e) {
        }
        openLog.close();
        build.createLog("test-stream");
        DistributedLogManager openLog2 = build.openLog("test-stream");
        LogWriter startLogSegmentNonPartitioned = openLog2.startLogSegmentNonPartitioned();
        startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L));
        startLogSegmentNonPartitioned.close();
        openLog2.close();
    }

    @Test(timeout = 60000)
    public void testInvalidStreamName() throws Exception {
        Assert.assertFalse(DLUtils.isReservedStreamName("test"));
        Assert.assertTrue(DLUtils.isReservedStreamName(".test"));
        Namespace build = NamespaceBuilder.newBuilder().conf(conf).uri(createDLMURI("/" + this.runtime.getMethodName())).build();
        try {
            build.openLog(".test1");
            Assert.fail("Should fail to create invalid stream .test");
        } catch (InvalidStreamNameException e) {
        }
        DistributedLogManager openLog = build.openLog("test1");
        LogWriter startLogSegmentNonPartitioned = openLog.startLogSegmentNonPartitioned();
        startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L));
        startLogSegmentNonPartitioned.close();
        openLog.close();
        try {
            build.openLog(".test2");
            Assert.fail("Should fail to create invalid stream .test2");
        } catch (InvalidStreamNameException e2) {
        }
        try {
            build.openLog("/ test2");
            Assert.fail("Should fail to create invalid stream / test2");
        } catch (InvalidStreamNameException e3) {
        }
        try {
            char[] cArr = new char[6];
            for (int i = 0; i < cArr.length; i++) {
                cArr[i] = 'a';
            }
            cArr[0] = 0;
            String str = new String(cArr);
            build.openLog(str);
            Assert.fail("Should fail to create invalid stream " + str);
        } catch (InvalidStreamNameException e4) {
        }
        try {
            char[] cArr2 = new char[6];
            for (int i2 = 0; i2 < cArr2.length; i2++) {
                cArr2[i2] = 'a';
            }
            cArr2[3] = 16;
            String str2 = new String(cArr2);
            build.openLog(str2);
            Assert.fail("Should fail to create invalid stream " + str2);
        } catch (InvalidStreamNameException e5) {
        }
        DistributedLogManager openLog2 = build.openLog("test_2-3");
        LogWriter startLogSegmentNonPartitioned2 = openLog2.startLogSegmentNonPartitioned();
        startLogSegmentNonPartitioned2.write(DLMTestUtil.getLogRecordInstance(1L));
        startLogSegmentNonPartitioned2.close();
        openLog2.close();
        HashSet newHashSet = Sets.newHashSet(build.getLogs());
        Assert.assertEquals(2L, newHashSet.size());
        Assert.assertTrue(newHashSet.contains("test1"));
        Assert.assertTrue(newHashSet.contains("test_2-3"));
        build.close();
    }

    @Test(timeout = 60000)
    public void testNamespaceListener() throws Exception {
        URI createDLMURI = createDLMURI("/" + this.runtime.getMethodName());
        this.zooKeeperClient.get().create(createDLMURI.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Namespace build = NamespaceBuilder.newBuilder().conf(conf).uri(createDLMURI).build();
        final CountDownLatch[] countDownLatchArr = new CountDownLatch[3];
        for (int i = 0; i < 3; i++) {
            countDownLatchArr[i] = new CountDownLatch(1);
        }
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        final AtomicReference atomicReference = new AtomicReference(null);
        build.registerNamespaceListener(new NamespaceListener() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestBKDistributedLogNamespace.1
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.callback.NamespaceListener
            public void onStreamsChanged(Iterator<String> it) {
                HashSet newHashSet = Sets.newHashSet(it);
                int incrementAndGet = atomicInteger.incrementAndGet();
                if (newHashSet.size() != incrementAndGet - 1) {
                    atomicInteger2.incrementAndGet();
                }
                atomicReference.set(newHashSet);
                countDownLatchArr[incrementAndGet - 1].countDown();
            }
        });
        countDownLatchArr[0].await();
        build.createLog("test1");
        countDownLatchArr[1].await();
        build.createLog("test2");
        countDownLatchArr[2].await();
        Assert.assertEquals(0L, atomicInteger2.get());
        Assert.assertNotNull(atomicReference.get());
        HashSet hashSet = new HashSet();
        hashSet.addAll((Collection) atomicReference.get());
        Assert.assertEquals(2L, ((Collection) atomicReference.get()).size());
        Assert.assertEquals(2L, hashSet.size());
        Assert.assertTrue(hashSet.contains("test1"));
        Assert.assertTrue(hashSet.contains("test2"));
    }

    private void initDlogMeta(String str, String str2, String str3) throws Exception {
        URI createDLMURI = createDLMURI(str);
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setCreateStreamIfNotExists(true);
        distributedLogConfiguration.setZkAclId(str2);
        Namespace build = NamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(createDLMURI).build();
        DistributedLogManager openLog = build.openLog(str3);
        LogWriter startLogSegmentNonPartitioned = openLog.startLogSegmentNonPartitioned();
        for (int i = 0; i < 10; i++) {
            startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L));
        }
        startLogSegmentNonPartitioned.close();
        openLog.close();
        build.close();
    }

    @Test(timeout = 60000)
    public void testAclPermsZkAccessConflict() throws Exception {
        String str = "/" + this.runtime.getMethodName();
        initDlogMeta(str, "test-un", "test-stream");
        URI createDLMURI = createDLMURI(str);
        ZooKeeperClient build = TestZooKeeperClientBuilder.newBuilder().name("unpriv").uri(createDLMURI).build();
        try {
            build.get().create(createDLMURI.getPath() + "/test-stream/test-garbage", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Assert.fail("write should have failed due to perms");
        } catch (KeeperException.NoAuthException e) {
            LOG.info("caught exception trying to write with no perms", (Throwable) e);
        }
        try {
            build.get().setData(createDLMURI.getPath() + "/test-stream", new byte[0], 0);
            Assert.fail("write should have failed due to perms");
        } catch (KeeperException.NoAuthException e2) {
            LOG.info("caught exception trying to write with no perms", (Throwable) e2);
        }
    }

    @Test(timeout = 60000)
    public void testAclPermsZkAccessNoConflict() throws Exception {
        String str = "/" + this.runtime.getMethodName();
        initDlogMeta(str, "test-un", "test-stream");
        URI createDLMURI = createDLMURI(str);
        ZooKeeperClient build = TestZooKeeperClientBuilder.newBuilder().name("unpriv").uri(createDLMURI).build();
        build.get().getChildren(createDLMURI.getPath() + "/test-stream", false, new Stat());
        build.get().getData(createDLMURI.getPath() + "/test-stream", false, new Stat());
    }

    @Test(timeout = 60000)
    public void testAclModifyPermsDlmConflict() throws Exception {
        initDlogMeta("/" + this.runtime.getMethodName(), "test-un", "test-stream");
        try {
            initDlogMeta("/" + this.runtime.getMethodName(), "not-test-un", "test-stream");
            Assert.fail("Write should have failed due to perms");
        } catch (ZKException e) {
            LOG.info("Caught exception trying to write with no perms", (Throwable) e);
            Assert.assertEquals(KeeperException.Code.NOAUTH, e.getKeeperExceptionCode());
        } catch (Exception e2) {
            LOG.info("Caught wrong exception trying to write with no perms", (Throwable) e2);
            Assert.fail("Wrong exception " + e2.getClass().getName() + " expected " + LockingException.class.getName());
        }
        initDlogMeta("/" + this.runtime.getMethodName(), "test-un", "test-stream");
    }

    @Test(timeout = 60000)
    public void testAclModifyPermsDlmNoConflict() throws Exception {
        initDlogMeta("/" + this.runtime.getMethodName(), "test-un", "test-stream");
        initDlogMeta("/" + this.runtime.getMethodName(), "test-un", "test-stream");
    }

    static void validateBadAllocatorConfiguration(DistributedLogConfiguration distributedLogConfiguration, URI uri) throws Exception {
        try {
            BKNamespaceDriver.validateAndGetFullLedgerAllocatorPoolPath(distributedLogConfiguration, uri);
            Assert.fail("Should throw exception when bad allocator configuration provided");
        } catch (IOException e) {
        }
    }

    @Test(timeout = 60000)
    public void testValidateAndGetFullLedgerAllocatorPoolPath() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.setEnableLedgerAllocatorPool(true);
        URI createDLMURI = createDLMURI("/" + this.runtime.getMethodName());
        distributedLogConfiguration.setLedgerAllocatorPoolName("test");
        distributedLogConfiguration.setLedgerAllocatorPoolPath("test");
        validateBadAllocatorConfiguration(distributedLogConfiguration, createDLMURI);
        distributedLogConfiguration.setLedgerAllocatorPoolPath(DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER);
        validateBadAllocatorConfiguration(distributedLogConfiguration, createDLMURI);
        distributedLogConfiguration.setLedgerAllocatorPoolPath("..");
        validateBadAllocatorConfiguration(distributedLogConfiguration, createDLMURI);
        distributedLogConfiguration.setLedgerAllocatorPoolPath("./");
        validateBadAllocatorConfiguration(distributedLogConfiguration, createDLMURI);
        distributedLogConfiguration.setLedgerAllocatorPoolPath(".test/");
        validateBadAllocatorConfiguration(distributedLogConfiguration, createDLMURI);
        distributedLogConfiguration.setLedgerAllocatorPoolPath(".test");
        distributedLogConfiguration.setLedgerAllocatorPoolName(null);
        validateBadAllocatorConfiguration(distributedLogConfiguration, createDLMURI);
    }

    @Test(timeout = 60000)
    public void testUseNamespaceAfterCloseShouldFailFast() throws Exception {
        Namespace build = NamespaceBuilder.newBuilder().conf(conf).uri(createDLMURI("/" + this.runtime.getMethodName())).build();
        build.createLog("test-stream");
        Assert.assertTrue(build.logExists("test-stream"));
        DistributedLogManager openLog = build.openLog("test-stream");
        BKAsyncLogWriter bKAsyncLogWriter = (BKAsyncLogWriter) openLog.startAsyncLogSegmentNonPartitioned();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 3) {
                break;
            }
            bKAsyncLogWriter.write(DLMTestUtil.getLargeLogRecordInstance(j2));
            j = j2 + 1;
        }
        bKAsyncLogWriter.closeAndComplete();
        LogReader inputStream = openLog.getInputStream(0L);
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= 3) {
                break;
            }
            Assert.assertEquals(inputStream.readNext(false).getTransactionId(), j4);
            j3 = j4 + 1;
        }
        build.deleteLog("test-stream");
        Assert.assertFalse(build.logExists("test-stream"));
        build.close();
        try {
            build.createLog("test-stream");
            Assert.fail("Should throw exception after namespace is closed");
        } catch (AlreadyClosedException e) {
        }
        try {
            build.openLog("test-stream");
            Assert.fail("Should throw exception after namespace is closed");
        } catch (AlreadyClosedException e2) {
        }
        try {
            build.logExists("test-stream");
            Assert.fail("Should throw exception after namespace is closed");
        } catch (AlreadyClosedException e3) {
        }
        try {
            build.getLogs();
            Assert.fail("Should throw exception after namespace is closed");
        } catch (AlreadyClosedException e4) {
        }
        try {
            build.deleteLog("test-stream");
            Assert.fail("Should throw exception after namespace is closed");
        } catch (AlreadyClosedException e5) {
        }
        try {
            build.createAccessControlManager();
            Assert.fail("Should throw exception after namespace is closed");
        } catch (AlreadyClosedException e6) {
        }
    }
}
