package org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.bk;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Lists;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.BookKeeperClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.BookKeeperClientBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestDistributedLogBase;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.ZooKeeperClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.bk.SimpleLedgerAllocator;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.Transaction;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.Utils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.zk.ZKTransaction;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.data.Stat;
import org.apache.pulsar.functions.runtime.shaded.org.junit.After;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Assert;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Before;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Rule;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Test;
import org.apache.pulsar.functions.runtime.shaded.org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/distributedlog/bk/TestLedgerAllocatorPool.class */
public class TestLedgerAllocatorPool extends TestDistributedLogBase {
    private static final String ledgersPath = "/ledgers";
    private ZooKeeperClient zkc;
    private BookKeeperClient bkc;
    private ScheduledExecutorService allocationExecutor;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) TestLedgerAllocatorPool.class);
    private static final Transaction.OpListener<LedgerHandle> NULL_LISTENER = new Transaction.OpListener<LedgerHandle>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.bk.TestLedgerAllocatorPool.1
        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.Transaction.OpListener
        public void onCommit(LedgerHandle ledgerHandle) {
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.Transaction.OpListener
        public void onAbort(Throwable th) {
        }
    };

    @Rule
    public TestName runtime = new TestName();
    private DistributedLogConfiguration dlConf = new DistributedLogConfiguration();

    private URI createURI(String str) {
        return URI.create("distributedlog://" + zkServers + str);
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestDistributedLogBase
    @Before
    public void setup() throws Exception {
        this.zkc = TestZooKeeperClientBuilder.newBuilder().uri(createURI("/")).build();
        this.bkc = BookKeeperClientBuilder.newBuilder().name("bkc").dlConfig(this.dlConf).ledgersPath("/ledgers").zkc(this.zkc).build();
        this.allocationExecutor = Executors.newSingleThreadScheduledExecutor();
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestDistributedLogBase
    @After
    public void teardown() throws Exception {
        this.bkc.close();
        this.zkc.close();
        this.allocationExecutor.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ZKTransaction newTxn() {
        return new ZKTransaction(this.zkc);
    }

    private void validatePoolSize(LedgerAllocatorPool ledgerAllocatorPool, int i, int i2, int i3, int i4) {
        Assert.assertEquals(i, ledgerAllocatorPool.pendingListSize());
        Assert.assertEquals(i2, ledgerAllocatorPool.allocatingListSize());
        Assert.assertEquals(i3, ledgerAllocatorPool.obtainMapSize());
        Assert.assertEquals(i4, ledgerAllocatorPool.rescueSize());
    }

    @Test(timeout = 60000)
    public void testNonAvailableAllocator() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.dlConf);
        distributedLogConfiguration.setEnsembleSize(2 * numBookies);
        distributedLogConfiguration.setWriteQuorumSize(2 * numBookies);
        LedgerAllocatorPool ledgerAllocatorPool = new LedgerAllocatorPool("/nonAvailableAllocator", 3, distributedLogConfiguration, this.zkc, this.bkc, this.allocationExecutor);
        for (int i = 0; i < 3; i++) {
            try {
                ledgerAllocatorPool.allocate();
                Utils.ioResult(ledgerAllocatorPool.tryObtain(newTxn(), NULL_LISTENER));
                Assert.fail("Should fail to allocate ledger if there are enought bookies");
            } catch (SimpleLedgerAllocator.AllocationException e) {
                Assert.assertEquals(SimpleLedgerAllocator.Phase.ERROR, e.getPhase());
            }
        }
        for (int i2 = 0; i2 < 3; i2++) {
            try {
                ledgerAllocatorPool.allocate();
                Utils.ioResult(ledgerAllocatorPool.tryObtain(newTxn(), NULL_LISTENER));
                Assert.fail("Should fail to allocate ledger if there aren't available allocators");
            } catch (SimpleLedgerAllocator.AllocationException e2) {
                Assert.assertEquals(SimpleLedgerAllocator.Phase.ERROR, e2.getPhase());
            } catch (IOException e3) {
            }
        }
        Utils.close(ledgerAllocatorPool);
    }

    @Test(timeout = 60000)
    public void testRescueAllocators() throws Exception {
        LedgerAllocatorPool ledgerAllocatorPool = new LedgerAllocatorPool("/rescueAllocators", 3, this.dlConf, this.zkc, this.bkc, this.allocationExecutor);
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(3);
        ArrayList newArrayListWithExpectedSize2 = Lists.newArrayListWithExpectedSize(3);
        for (int i = 0; i < 3; i++) {
            ZKTransaction newTxn = newTxn();
            ledgerAllocatorPool.allocate();
            LedgerHandle ledgerHandle = (LedgerHandle) Utils.ioResult(ledgerAllocatorPool.tryObtain(newTxn, NULL_LISTENER));
            String str = ledgerAllocatorPool.getLedgerAllocator(ledgerHandle).allocatePath;
            logger.info("Allocated ledger {} from path {}", Long.valueOf(ledgerHandle.getId()), str);
            newArrayListWithExpectedSize.add(newTxn);
            newArrayListWithExpectedSize2.add(str);
        }
        for (int i2 = 0; i2 < 3; i2++) {
            ZKTransaction zKTransaction = (ZKTransaction) newArrayListWithExpectedSize.get(i2);
            String str2 = (String) newArrayListWithExpectedSize2.get(i2);
            Utils.ioResult(zKTransaction.execute());
            this.zkc.get().setData(str2, this.zkc.get().getData(str2, false, new Stat()), -1);
        }
        int i3 = 0;
        HashSet hashSet = new HashSet();
        while (i3 < 2 * 3) {
            try {
                ledgerAllocatorPool.allocate();
                ZKTransaction newTxn2 = newTxn();
                LedgerHandle ledgerHandle2 = (LedgerHandle) Utils.ioResult(ledgerAllocatorPool.tryObtain(newTxn2, NULL_LISTENER));
                String str3 = ledgerAllocatorPool.getLedgerAllocator(ledgerHandle2).allocatePath;
                logger.info("Allocated ledger {} from path {}", Long.valueOf(ledgerHandle2.getId()), str3);
                hashSet.add(str3);
                Utils.ioResult(newTxn2.execute());
                i3++;
            } catch (IOException e) {
            }
        }
        Assert.assertEquals(2 * 3, i3);
        Assert.assertEquals(3, hashSet.size());
        Utils.close(ledgerAllocatorPool);
    }

    @Test(timeout = 60000)
    public void testAllocateWhenNoAllocator() throws Exception {
        LedgerAllocatorPool ledgerAllocatorPool = new LedgerAllocatorPool("/allocateWhenNoAllocator", 0, this.dlConf, this.zkc, this.bkc, this.allocationExecutor);
        try {
            ledgerAllocatorPool.allocate();
            Assert.fail("Should fail to allocate ledger if there isn't allocator.");
        } catch (SimpleLedgerAllocator.AllocationException e) {
            Assert.fail("Should fail to allocate ledger if there isn't allocator.");
        } catch (IOException e2) {
        }
        Utils.close(ledgerAllocatorPool);
    }

    @Test(timeout = 60000)
    public void testObtainWhenNoAllocator() throws Exception {
        LedgerAllocatorPool ledgerAllocatorPool = new LedgerAllocatorPool("/obtainWhenNoAllocator", 0, this.dlConf, this.zkc, this.bkc, this.allocationExecutor);
        try {
            Utils.ioResult(ledgerAllocatorPool.tryObtain(newTxn(), NULL_LISTENER));
            Assert.fail("Should fail obtain ledger handle if there is no allocator.");
        } catch (SimpleLedgerAllocator.AllocationException e) {
            Assert.fail("Should fail obtain ledger handle if there is no allocator.");
        } catch (IOException e2) {
        }
        Utils.close(ledgerAllocatorPool);
    }

    @Test(timeout = 60000)
    public void testAllocateMultipleLedgers() throws Exception {
        LedgerAllocatorPool ledgerAllocatorPool = new LedgerAllocatorPool("/" + this.runtime.getMethodName(), 5, this.dlConf, this.zkc, this.bkc, this.allocationExecutor);
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 20; i++) {
            ledgerAllocatorPool.allocate();
            ZKTransaction newTxn = newTxn();
            LedgerHandle ledgerHandle = (LedgerHandle) Utils.ioResult(ledgerAllocatorPool.tryObtain(newTxn, NULL_LISTENER));
            Utils.ioResult(newTxn.execute());
            hashSet.add(ledgerHandle);
        }
        Assert.assertEquals(20, hashSet.size());
    }

    @Test(timeout = 60000)
    public void testConcurrentAllocation() throws Exception {
        final LedgerAllocatorPool ledgerAllocatorPool = new LedgerAllocatorPool("/concurrentAllocation", 5, this.dlConf, this.zkc, this.bkc, this.allocationExecutor);
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Thread[] threadArr = new Thread[5];
        for (int i = 0; i < 5; i++) {
            final int i2 = i;
            threadArr[i] = new Thread() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.bk.TestLedgerAllocatorPool.2
                int numLedgers = 50;

                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    for (int i3 = 0; i3 < this.numLedgers; i3++) {
                        try {
                            ledgerAllocatorPool.allocate();
                            ZKTransaction newTxn = TestLedgerAllocatorPool.this.newTxn();
                            LedgerHandle ledgerHandle = (LedgerHandle) Utils.ioResult(ledgerAllocatorPool.tryObtain(newTxn, TestLedgerAllocatorPool.NULL_LISTENER));
                            Utils.ioResult(newTxn.execute());
                            ledgerHandle.close();
                            concurrentHashMap.putIfAbsent(Long.valueOf(ledgerHandle.getId()), ledgerHandle);
                            TestLedgerAllocatorPool.logger.info("[thread {}] allocate {}th ledger {}", Integer.valueOf(i2), Integer.valueOf(i3), Long.valueOf(ledgerHandle.getId()));
                        } catch (Exception e) {
                            atomicInteger.incrementAndGet();
                            return;
                        }
                    }
                }
            };
        }
        for (Thread thread : threadArr) {
            thread.start();
        }
        for (Thread thread2 : threadArr) {
            thread2.join();
        }
        Assert.assertEquals(0L, atomicInteger.get());
        Assert.assertEquals(250L, concurrentHashMap.size());
        Utils.close(ledgerAllocatorPool);
    }
}
