package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/NonPersistentTopicTest.class */
public class NonPersistentTopicTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(NonPersistentTopicTest.class);
    private final String configClusterName = "r1";

    /* loaded from: input_file:org/apache/pulsar/client/api/NonPersistentTopicTest$ReplicationClusterManager.class */
    class ReplicationClusterManager {
        URL url1;
        PulsarService pulsar1;
        BrokerService ns1;
        PulsarAdmin admin1;
        LocalBookkeeperEnsemble bkEnsemble1;
        URL url2;
        ServiceConfiguration config2;
        PulsarService pulsar2;
        BrokerService ns2;
        PulsarAdmin admin2;
        LocalBookkeeperEnsemble bkEnsemble2;
        URL url3;
        ServiceConfiguration config3;
        PulsarService pulsar3;
        BrokerService ns3;
        PulsarAdmin admin3;
        LocalBookkeeperEnsemble bkEnsemble3;
        ZookeeperServerTest globalZkS;
        ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue());
        static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;

        ReplicationClusterManager() {
        }

        public int getBrokerServicePurgeInactiveFrequency() {
            return 60;
        }

        public boolean isBrokerServicePurgeInactiveTopic() {
            return false;
        }

        void setupReplicationCluster() throws Exception {
            NonPersistentTopicTest.log.info("--- Starting ReplicatorTestBase::setup ---");
            this.globalZkS = new ZookeeperServerTest(0);
            this.globalZkS.start();
            this.bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> {
                return 0;
            });
            this.bkEnsemble1.start();
            ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
            serviceConfiguration.setClusterName("r1");
            serviceConfiguration.setAdvertisedAddress("localhost");
            serviceConfiguration.setWebServicePort(Optional.of(0));
            serviceConfiguration.setZookeeperServers("127.0.0.1:" + this.bkEnsemble1.getZookeeperPort());
            serviceConfiguration.setConfigurationStoreServers("127.0.0.1:" + this.globalZkS.getZookeeperPort() + "/foo");
            serviceConfiguration.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
            serviceConfiguration.setBrokerDeleteInactiveTopicsFrequencySeconds(inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
            serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
            serviceConfiguration.setBrokerServicePort(Optional.of(0));
            serviceConfiguration.setBacklogQuotaCheckIntervalInSeconds(5);
            serviceConfiguration.setAllowAutoTopicCreationType("non-partitioned");
            this.pulsar1 = new PulsarService(serviceConfiguration);
            this.pulsar1.start();
            this.ns1 = this.pulsar1.getBrokerService();
            this.url1 = new URL(this.pulsar1.getWebServiceAddress());
            this.admin1 = PulsarAdmin.builder().serviceHttpUrl(this.url1.toString()).build();
            this.bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> {
                return 0;
            });
            this.bkEnsemble2.start();
            this.config2 = new ServiceConfiguration();
            this.config2.setClusterName("r2");
            this.config2.setWebServicePort(Optional.of(0));
            this.config2.setAdvertisedAddress("localhost");
            this.config2.setZookeeperServers("127.0.0.1:" + this.bkEnsemble2.getZookeeperPort());
            this.config2.setConfigurationStoreServers("127.0.0.1:" + this.globalZkS.getZookeeperPort() + "/foo");
            this.config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
            this.config2.setBrokerDeleteInactiveTopicsFrequencySeconds(inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
            this.config2.setBrokerShutdownTimeoutMs(0L);
            this.config2.setBrokerServicePort(Optional.of(0));
            this.config2.setBacklogQuotaCheckIntervalInSeconds(5);
            this.config2.setAllowAutoTopicCreationType("non-partitioned");
            this.pulsar2 = new PulsarService(this.config2);
            this.pulsar2.start();
            this.ns2 = this.pulsar2.getBrokerService();
            this.url2 = new URL(this.pulsar2.getWebServiceAddress());
            this.admin2 = PulsarAdmin.builder().serviceHttpUrl(this.url2.toString()).build();
            this.bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> {
                return 0;
            });
            this.bkEnsemble3.start();
            this.config3 = new ServiceConfiguration();
            this.config3.setClusterName("r3");
            this.config3.setWebServicePort(Optional.of(0));
            this.config3.setAdvertisedAddress("localhost");
            this.config3.setZookeeperServers("127.0.0.1:" + this.bkEnsemble3.getZookeeperPort());
            this.config3.setConfigurationStoreServers("127.0.0.1:" + this.globalZkS.getZookeeperPort() + "/foo");
            this.config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
            this.config3.setBrokerDeleteInactiveTopicsFrequencySeconds(inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
            this.config3.setBrokerShutdownTimeoutMs(0L);
            this.config3.setBrokerServicePort(Optional.of(0));
            this.config3.setAllowAutoTopicCreationType("non-partitioned");
            this.pulsar3 = new PulsarService(this.config3);
            this.pulsar3.start();
            this.ns3 = this.pulsar3.getBrokerService();
            this.url3 = new URL(this.pulsar3.getWebServiceAddress());
            this.admin3 = PulsarAdmin.builder().serviceHttpUrl(this.url3.toString()).build();
            this.admin1.clusters().createCluster("r1", ClusterData.builder().serviceUrl(this.url1.toString()).brokerServiceUrl(this.pulsar1.getSafeBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar1.getBrokerServiceUrlTls()).build());
            this.admin1.clusters().createCluster("r2", ClusterData.builder().serviceUrl(this.url2.toString()).brokerServiceUrl(this.pulsar2.getSafeBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar1.getBrokerServiceUrlTls()).build());
            this.admin1.clusters().createCluster("r3", ClusterData.builder().serviceUrl(this.url3.toString()).brokerServiceUrl(this.pulsar3.getSafeBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar1.getBrokerServiceUrlTls()).build());
            this.admin1.clusters().createCluster("global", ClusterData.builder().serviceUrl("http://global:8080").build());
            this.admin1.tenants().createTenant("pulsar", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2", "appid3"}), Sets.newHashSet(new String[]{"r1", "r2", "r3"})));
            this.admin1.namespaces().createNamespace("pulsar/global/ns");
            this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Sets.newHashSet(new String[]{"r1", "r2", "r3"}));
            Assert.assertEquals(this.admin2.clusters().getCluster("r1").getServiceUrl(), this.url1.toString());
            Assert.assertEquals(this.admin2.clusters().getCluster("r2").getServiceUrl(), this.url2.toString());
            Assert.assertEquals(this.admin2.clusters().getCluster("r3").getServiceUrl(), this.url3.toString());
            Assert.assertEquals(this.admin2.clusters().getCluster("r1").getBrokerServiceUrl(), this.pulsar1.getSafeBrokerServiceUrl());
            Assert.assertEquals(this.admin2.clusters().getCluster("r2").getBrokerServiceUrl(), this.pulsar2.getSafeBrokerServiceUrl());
            Assert.assertEquals(this.admin2.clusters().getCluster("r3").getBrokerServiceUrl(), this.pulsar3.getSafeBrokerServiceUrl());
            Thread.sleep(100L);
            NonPersistentTopicTest.log.info("--- ReplicatorTestBase::setup completed ---");
        }

        private int inSec(int i, TimeUnit timeUnit) {
            return (int) TimeUnit.SECONDS.convert(i, timeUnit);
        }

        void shutdownReplicationCluster() throws Exception {
            NonPersistentTopicTest.log.info("--- Shutting down ---");
            this.executor.shutdownNow();
            this.admin1.close();
            this.admin2.close();
            this.admin3.close();
            this.pulsar3.close();
            this.ns3.close();
            this.pulsar2.close();
            this.ns2.close();
            this.pulsar1.close();
            this.ns1.close();
            this.bkEnsemble1.stop();
            this.bkEnsemble2.stop();
            this.bkEnsemble3.stop();
            this.globalZkS.stop();
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "subscriptionType")
    public Object[][] getSubscriptionType() {
        return new Object[]{new Object[]{SubscriptionType.Shared}, new Object[]{SubscriptionType.Exclusive}};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "loadManager")
    public Object[][] getLoadManager() {
        return new Object[]{new Object[]{SimpleLoadManagerImpl.class.getCanonicalName()}, new Object[]{ModularLoadManagerImpl.class.getCanonicalName()}};
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut = 90000)
    public void testNonPersistentPartitionsAreNotAutoCreatedWhenThePartitionedTopicDoesNotExist() throws Exception {
        boolean isAllowAutoTopicCreation = this.conf.isAllowAutoTopicCreation();
        try {
            cleanup();
            this.conf.setAllowAutoTopicCreation(false);
            setup();
            Assert.assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> {
                this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://public/default/issue-9173-partition-0"}).subscriptionName("sub-issue-9173").subscribe();
            });
            Assert.assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> {
                this.pulsarClient.newProducer().topic("non-persistent://public/default/issue-9173-partition-0").create();
            });
        } finally {
            this.conf.setAllowAutoTopicCreation(isAllowAutoTopicCreation);
        }
    }

    @Test(timeOut = 90000)
    public void testAutoCreateNonPersistentPartitionsWhenThePartitionedTopicExists() throws Exception {
        boolean isAllowAutoTopicCreation = this.conf.isAllowAutoTopicCreation();
        try {
            cleanup();
            this.conf.setAllowAutoTopicCreation(false);
            setup();
            this.admin.topics().createPartitionedTopic("non-persistent://public/default/issue-9173", 3);
            MultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://public/default/issue-9173"}).subscriptionName("sub-issue-9173").subscribe();
            Assert.assertEquals(subscribe.getConsumers().size(), 3);
            PartitionedProducerImpl create = this.pulsarClient.newProducer().topic("non-persistent://public/default/issue-9173").create();
            Assert.assertEquals(create.getProducers().size(), 3);
            subscribe.close();
            create.close();
            this.conf.setAllowAutoTopicCreation(isAllowAutoTopicCreation);
        } catch (Throwable th) {
            this.conf.setAllowAutoTopicCreation(isAllowAutoTopicCreation);
            throw th;
        }
    }

    @Test(dataProvider = "subscriptionType")
    public void testNonPersistentTopic(SubscriptionType subscriptionType) throws Exception {
        Message receive;
        log.info("-- Starting {} test --", this.methodName);
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").subscriptionType(subscriptionType).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/unacked-topic").create();
        for (int i = 0; i < 500; i++) {
            create.send(("my-message-" + i).getBytes());
            Thread.sleep(10L);
        }
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 500 && (receive = subscribe.receive(1, TimeUnit.SECONDS)) != null; i2++) {
            subscribe.acknowledge(receive);
            String str = new String(receive.getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
        }
        Assert.assertEquals(newHashSet.size(), 500);
        create.close();
        subscribe.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "subscriptionType")
    public void testPartitionedNonPersistentTopic(SubscriptionType subscriptionType) throws Exception {
        Message receive;
        log.info("-- Starting {} test --", this.methodName);
        this.admin.topics().createPartitionedTopic("non-persistent://my-property/my-ns/partitioned-topic", 5);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/partitioned-topic"}).subscriptionName("subscriber-1").subscriptionType(subscriptionType).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/partitioned-topic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 500; i++) {
            create.send(("my-message-" + i).getBytes());
            Thread.sleep(10L);
        }
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 500 && (receive = subscribe.receive(1, TimeUnit.SECONDS)) != null; i2++) {
            subscribe.acknowledge(receive);
            String str = new String(receive.getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
        }
        Assert.assertEquals(newHashSet.size(), 500);
        create.close();
        subscribe.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "subscriptionType")
    public void testPartitionedNonPersistentTopicWithTcpLookup(SubscriptionType subscriptionType) throws Exception {
        Message receive;
        log.info("-- Starting {} test --", this.methodName);
        this.admin.topics().createPartitionedTopic("non-persistent://my-property/my-ns/partitioned-topic", 5);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Consumer subscribe = build.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/partitioned-topic"}).subscriptionName("subscriber-1").subscriptionType(subscriptionType).subscribe();
            Producer create = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/partitioned-topic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            for (int i = 0; i < 5; i++) {
                Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference(TopicName.get("non-persistent://my-property/my-ns/partitioned-topic").getPartition(i).toString()));
            }
            for (int i2 = 0; i2 < 500; i2++) {
                create.send(("my-message-" + i2).getBytes());
                Thread.sleep(10L);
            }
            HashSet newHashSet = Sets.newHashSet();
            for (int i3 = 0; i3 < 500 && (receive = subscribe.receive(1, TimeUnit.SECONDS)) != null; i3++) {
                subscribe.acknowledge(receive);
                String str = new String(receive.getData());
                log.debug("Received message: [{}]", str);
                testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i3);
            }
            Assert.assertEquals(newHashSet.size(), 500);
            create.close();
            subscribe.close();
            log.info("-- Exiting {} test --", this.methodName);
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test(dataProvider = "subscriptionType")
    public void testConsumerInternalQueueMaxOut(SubscriptionType subscriptionType) throws Exception {
        Message receive;
        log.info("-- Starting {} test --", this.methodName);
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).receiverQueueSize(10).subscriptionName("subscriber-1").subscriptionType(subscriptionType).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/unacked-topic").create();
        for (int i = 0; i < 50; i++) {
            create.send(("my-message-" + i).getBytes());
            Thread.sleep(10L);
        }
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 50 && (receive = subscribe.receive(1, TimeUnit.SECONDS)) != null; i2++) {
            subscribe.acknowledge(receive);
            String str = new String(receive.getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
        }
        Assert.assertEquals(newHashSet.size(), 10);
        create.close();
        subscribe.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testProducerRateLimit() throws Exception {
        Message receive;
        int maxConcurrentNonPersistentMessagePerConnection = this.conf.getMaxConcurrentNonPersistentMessagePerConnection();
        try {
            this.conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
            stopBroker();
            startBroker();
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
            try {
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").subscribe();
                Producer create = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/unacked-topic").create();
                byte[] bytes = "testData".getBytes();
                CountDownLatch countDownLatch = new CountDownLatch(10);
                for (int i = 0; i < 10; i++) {
                    newFixedThreadPool.submit(() -> {
                        try {
                            create.send(bytes);
                        } catch (Exception e) {
                            log.error("Failed to send message", e);
                            atomicBoolean.set(true);
                        }
                        countDownLatch.countDown();
                    });
                }
                countDownLatch.await();
                HashSet newHashSet = Sets.newHashSet();
                for (int i2 = 0; i2 < 10 && (receive = subscribe.receive(500, TimeUnit.MILLISECONDS)) != null; i2++) {
                    newHashSet.add(new String(receive.getData()));
                }
                Assert.assertFalse(atomicBoolean.get());
                Assert.assertNotEquals(Integer.valueOf(newHashSet.size()), 10);
                create.close();
                if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                    newFixedThreadPool.shutdownNow();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                    newFixedThreadPool.shutdownNow();
                }
                throw th;
            }
        } finally {
            this.conf.setMaxConcurrentNonPersistentMessagePerConnection(maxConcurrentNonPersistentMessagePerConnection);
        }
    }

    @Test
    public void testMultipleSubscription() throws Exception {
        Message receive;
        Message receive2;
        Message receive3;
        Message receive4;
        log.info("-- Starting {} test --", this.methodName);
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-shared").subscriptionType(SubscriptionType.Shared).subscribe();
        ConsumerImpl subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-shared").subscriptionType(SubscriptionType.Shared).subscribe();
        ConsumerImpl subscribe3 = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-fo").subscriptionType(SubscriptionType.Failover).subscribe();
        ConsumerImpl subscribe4 = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-fo").subscriptionType(SubscriptionType.Failover).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/unacked-topic").create();
        for (int i = 0; i < 500; i++) {
            create.send(("my-message-" + i).getBytes());
            Thread.sleep(10L);
        }
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 500 && (receive4 = subscribe.receive(500, TimeUnit.MILLISECONDS)) != null; i2++) {
            newHashSet.add(new String(receive4.getData()));
        }
        for (int i3 = 0; i3 < 500 && (receive3 = subscribe2.receive(500, TimeUnit.MILLISECONDS)) != null; i3++) {
            newHashSet.add(new String(receive3.getData()));
        }
        Assert.assertEquals(newHashSet.size(), 500);
        newHashSet.clear();
        for (int i4 = 0; i4 < 500 && (receive2 = subscribe3.receive(500, TimeUnit.MILLISECONDS)) != null; i4++) {
            newHashSet.add(new String(receive2.getData()));
        }
        for (int i5 = 0; i5 < 500 && (receive = subscribe4.receive(500, TimeUnit.MILLISECONDS)) != null; i5++) {
            newHashSet.add(new String(receive.getData()));
        }
        Assert.assertEquals(newHashSet.size(), 500);
        create.close();
        subscribe.close();
        subscribe2.close();
        subscribe3.close();
        subscribe4.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testTopicStats() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).subscriptionType(SubscriptionType.Shared).subscriptionName("non-persistent").subscribe();
        Thread.sleep(100L);
        NonPersistentTopic nonPersistentTopic = (NonPersistentTopic) this.pulsar.getBrokerService().getTopicReference("non-persistent://my-property/my-ns/unacked-topic").get();
        Assert.assertNotNull(nonPersistentTopic);
        rolloverPerIntervalStats(this.pulsar);
        NonPersistentTopicStatsImpl stats = nonPersistentTopic.getStats(false, false);
        SubscriptionStats subscriptionStats = (SubscriptionStats) stats.getSubscriptions().values().iterator().next();
        Assert.assertEquals(stats.getSubscriptions().keySet().size(), 1);
        Assert.assertEquals(subscriptionStats.getConsumers().size(), 1);
        Producer create = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/unacked-topic").create();
        Thread.sleep(100L);
        for (int i = 0; i < 100; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Thread.sleep(100L);
        rolloverPerIntervalStats(this.pulsar);
        SubscriptionStats subscriptionStats2 = (SubscriptionStats) nonPersistentTopic.getStats(false, false).getSubscriptions().values().iterator().next();
        Assert.assertTrue(subscriptionStats2.getMsgRateOut() > 0.0d);
        Assert.assertEquals(subscriptionStats2.getConsumers().size(), 1);
        Assert.assertTrue(subscriptionStats2.getMsgThroughputOut() > 0.0d);
        Assert.assertTrue(((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgRateOut() > 0.0d);
        Assert.assertTrue(((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgThroughputOut() > 0.0d);
        Assert.assertEquals(subscriptionStats2.getMsgRateRedeliver(), 0.0d);
        create.close();
        subscribe.close();
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testReplicator() throws Exception {
        Message receive;
        Message receive2;
        Message receive3;
        Message receive4;
        ReplicationClusterManager replicationClusterManager = new ReplicationClusterManager();
        replicationClusterManager.setupReplicationCluster();
        try {
            PulsarClient build = PulsarClient.builder().serviceUrl(replicationClusterManager.url1.toString()).build();
            try {
                build = PulsarClient.builder().serviceUrl(replicationClusterManager.url2.toString()).build();
                try {
                    build = PulsarClient.builder().serviceUrl(replicationClusterManager.url3.toString()).build();
                    try {
                        ConsumerImpl subscribe = build.newConsumer().topic(new String[]{"non-persistent://pulsar/global/ns/nonPersistentTopic"}).subscriptionName("subscriber-1").subscribe();
                        ConsumerImpl subscribe2 = build.newConsumer().topic(new String[]{"non-persistent://pulsar/global/ns/nonPersistentTopic"}).subscriptionName("subscriber-2").subscribe();
                        ConsumerImpl subscribe3 = build.newConsumer().topic(new String[]{"non-persistent://pulsar/global/ns/nonPersistentTopic"}).subscriptionName("subscriber-1").subscribe();
                        ConsumerImpl subscribe4 = build.newConsumer().topic(new String[]{"non-persistent://pulsar/global/ns/nonPersistentTopic"}).subscriptionName("subscriber-1").subscribe();
                        Producer create = build.newProducer().topic("non-persistent://pulsar/global/ns/nonPersistentTopic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                        Thread.sleep(100L);
                        PulsarService pulsarService = replicationClusterManager.pulsar1;
                        NonPersistentTopic nonPersistentTopic = (NonPersistentTopic) replicationClusterManager.pulsar1.getBrokerService().getTopicReference("non-persistent://pulsar/global/ns/nonPersistentTopic").get();
                        NonPersistentReplicator persistentReplicator = nonPersistentTopic.getPersistentReplicator("r2");
                        NonPersistentReplicator persistentReplicator2 = nonPersistentTopic.getPersistentReplicator("r3");
                        Assert.assertNotNull(nonPersistentTopic);
                        Assert.assertNotNull(persistentReplicator);
                        Assert.assertNotNull(persistentReplicator2);
                        rolloverPerIntervalStats(pulsarService);
                        NonPersistentTopicStatsImpl stats = nonPersistentTopic.getStats(false, false);
                        SubscriptionStats subscriptionStats = (SubscriptionStats) stats.getSubscriptions().values().iterator().next();
                        Assert.assertEquals(stats.getSubscriptions().keySet().size(), 2);
                        Assert.assertEquals(subscriptionStats.getConsumers().size(), 1);
                        Thread.sleep(100L);
                        for (int i = 0; i < 100; i++) {
                            create.send(("my-message-" + i).getBytes());
                        }
                        HashSet newHashSet = Sets.newHashSet();
                        for (int i2 = 0; i2 < 100 && (receive4 = subscribe.receive(300, TimeUnit.MILLISECONDS)) != null; i2++) {
                            testMessageOrderAndDuplicates(newHashSet, new String(receive4.getData()), "my-message-" + i2);
                        }
                        Assert.assertEquals(newHashSet.size(), 100);
                        newHashSet.clear();
                        for (int i3 = 0; i3 < 100 && (receive3 = subscribe2.receive(300, TimeUnit.MILLISECONDS)) != null; i3++) {
                            testMessageOrderAndDuplicates(newHashSet, new String(receive3.getData()), "my-message-" + i3);
                        }
                        Assert.assertEquals(newHashSet.size(), 100);
                        newHashSet.clear();
                        for (int i4 = 0; i4 < 100 && (receive2 = subscribe3.receive(300, TimeUnit.MILLISECONDS)) != null; i4++) {
                            testMessageOrderAndDuplicates(newHashSet, new String(receive2.getData()), "my-message-" + i4);
                        }
                        Assert.assertEquals(newHashSet.size(), 100);
                        newHashSet.clear();
                        for (int i5 = 0; i5 < 100 && (receive = subscribe4.receive(300, TimeUnit.MILLISECONDS)) != null; i5++) {
                            testMessageOrderAndDuplicates(newHashSet, new String(receive.getData()), "my-message-" + i5);
                        }
                        Assert.assertEquals(newHashSet.size(), 100);
                        Thread.sleep(100L);
                        rolloverPerIntervalStats(pulsarService);
                        SubscriptionStats subscriptionStats2 = (SubscriptionStats) nonPersistentTopic.getStats(false, false).getSubscriptions().values().iterator().next();
                        Assert.assertTrue(subscriptionStats2.getMsgRateOut() > 0.0d);
                        Assert.assertEquals(subscriptionStats2.getConsumers().size(), 1);
                        Assert.assertTrue(subscriptionStats2.getMsgThroughputOut() > 0.0d);
                        Assert.assertTrue(((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgRateOut() > 0.0d);
                        Assert.assertTrue(((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgThroughputOut() > 0.0d);
                        Assert.assertEquals(subscriptionStats2.getMsgRateRedeliver(), 0.0d);
                        create.close();
                        subscribe.close();
                        subscribe3.close();
                        subscribe4.close();
                        if (Collections.singletonList(build).get(0) != null) {
                            build.close();
                        }
                        if (Collections.singletonList(build).get(0) != null) {
                            build.close();
                        }
                        if (Collections.singletonList(build).get(0) != null) {
                            build.close();
                        }
                    } finally {
                        if (Collections.singletonList(build).get(0) != null) {
                            build.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th) {
                throw th;
            }
        } finally {
            replicationClusterManager.shutdownReplicationCluster();
        }
    }

    @Test(dataProvider = "loadManager")
    public void testLoadManagerAssignmentForNonPersistentTestAssignment(String str) throws Exception {
        String loadManagerClassName = this.conf.getLoadManagerClassName();
        boolean isEnableNonPersistentTopics = this.conf.isEnableNonPersistentTopics();
        try {
            stopBroker();
            this.conf.setEnableNonPersistentTopics(false);
            this.conf.setLoadManagerClassName(str);
            startBroker();
            Field declaredField = PulsarService.class.getDeclaredField("loadManager");
            declaredField.setAccessible(true);
            AtomicReference atomicReference = (AtomicReference) declaredField.get(this.pulsar);
            LoadManager create = LoadManager.create(this.pulsar);
            create.start();
            ((LoadManager) atomicReference.getAndSet(create)).stop();
            ResourceUnit resourceUnit = null;
            try {
                resourceUnit = (ResourceUnit) ((LoadManager) this.pulsar.getLoadManager().get()).getLeastLoaded(this.pulsar.getNamespaceService().getBundle(TopicName.get("non-persistent://my-property/my-ns/loadManager"))).get();
            } catch (Exception e) {
            }
            Assert.assertNull(resourceUnit);
            try {
                ((Producer) this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/loadManager").createAsync().get(1L, TimeUnit.SECONDS)).close();
                Assert.fail("topic loading should have failed");
            } catch (Exception e2) {
            }
            Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("non-persistent://my-property/my-ns/loadManager").isPresent());
            this.conf.setEnableNonPersistentTopics(isEnableNonPersistentTopics);
            this.conf.setLoadManagerClassName(loadManagerClassName);
        } catch (Throwable th) {
            this.conf.setEnableNonPersistentTopics(isEnableNonPersistentTopics);
            this.conf.setLoadManagerClassName(loadManagerClassName);
            throw th;
        }
    }

    @Test
    public void testNonPersistentTopicUnderPersistentNamespace() throws Exception {
        boolean isEnableNonPersistentTopics = this.conf.isEnableNonPersistentTopics();
        try {
            this.conf.setEnableNonPersistentTopics(false);
            stopBroker();
            startBroker();
            try {
                ((Producer) this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/persitentNamespace").createAsync().get(1L, TimeUnit.SECONDS)).close();
                Assert.fail("topic loading should have failed");
            } catch (Exception e) {
            }
            Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("non-persistent://my-property/my-ns/persitentNamespace").isPresent());
            this.conf.setEnableNonPersistentTopics(isEnableNonPersistentTopics);
        } catch (Throwable th) {
            this.conf.setEnableNonPersistentTopics(isEnableNonPersistentTopics);
            throw th;
        }
    }

    @Test(dataProvider = "loadManager")
    public void testNonPersistentBrokerModeRejectPersistentTopic(String str) throws Exception {
        String loadManagerClassName = this.conf.getLoadManagerClassName();
        boolean isEnablePersistentTopics = this.conf.isEnablePersistentTopics();
        boolean isEnableNonPersistentTopics = this.conf.isEnableNonPersistentTopics();
        try {
            stopBroker();
            this.conf.setEnableNonPersistentTopics(true);
            this.conf.setEnablePersistentTopics(false);
            this.conf.setLoadManagerClassName(str);
            startBroker();
            Field declaredField = PulsarService.class.getDeclaredField("loadManager");
            declaredField.setAccessible(true);
            AtomicReference atomicReference = (AtomicReference) declaredField.get(this.pulsar);
            LoadManager create = LoadManager.create(this.pulsar);
            create.start();
            ((LoadManager) atomicReference.getAndSet(create)).stop();
            ResourceUnit resourceUnit = null;
            try {
                resourceUnit = (ResourceUnit) ((LoadManager) this.pulsar.getLoadManager().get()).getLeastLoaded(this.pulsar.getNamespaceService().getBundle(TopicName.get("persistent://my-property/my-ns/loadManager"))).get();
            } catch (Exception e) {
            }
            Assert.assertNull(resourceUnit);
            try {
                ((Producer) this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/loadManager").createAsync().get(1L, TimeUnit.SECONDS)).close();
                Assert.fail("topic loading should have failed");
            } catch (Exception e2) {
            }
            Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("persistent://my-property/my-ns/loadManager").isPresent());
            this.conf.setEnablePersistentTopics(isEnablePersistentTopics);
            this.conf.setEnableNonPersistentTopics(isEnableNonPersistentTopics);
            this.conf.setLoadManagerClassName(loadManagerClassName);
        } catch (Throwable th) {
            this.conf.setEnablePersistentTopics(isEnablePersistentTopics);
            this.conf.setEnableNonPersistentTopics(isEnableNonPersistentTopics);
            this.conf.setLoadManagerClassName(loadManagerClassName);
            throw th;
        }
    }

    @Test
    public void testMsgDropStat() throws Exception {
        int maxConcurrentNonPersistentMessagePerConnection = this.conf.getMaxConcurrentNonPersistentMessagePerConnection();
        try {
            this.conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
            stopBroker();
            startBroker();
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/stats-topic"}).subscriptionName("subscriber-1").receiverQueueSize(1).subscribe();
            Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/stats-topic"}).subscriptionName("subscriber-2").receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe();
            ProducerImpl create = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/stats-topic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
            try {
                byte[] bytes = "testData".getBytes();
                CountDownLatch countDownLatch = new CountDownLatch(200);
                for (int i = 0; i < 200; i++) {
                    newFixedThreadPool.submit(() -> {
                        create.sendAsync(bytes).handle((messageId, th) -> {
                            countDownLatch.countDown();
                            return null;
                        });
                    });
                }
                countDownLatch.await();
                NonPersistentTopic nonPersistentTopic = (NonPersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("non-persistent://my-property/my-ns/stats-topic").get();
                this.pulsar.getBrokerService().updateRates();
                NonPersistentTopicStatsImpl stats = nonPersistentTopic.getStats(false, false);
                NonPersistentPublisherStats nonPersistentPublisherStats = (NonPersistentPublisherStats) stats.getPublishers().get(0);
                NonPersistentSubscriptionStats nonPersistentSubscriptionStats = (NonPersistentSubscriptionStats) stats.getSubscriptions().get("subscriber-1");
                NonPersistentSubscriptionStats nonPersistentSubscriptionStats2 = (NonPersistentSubscriptionStats) stats.getSubscriptions().get("subscriber-2");
                Assert.assertTrue(nonPersistentPublisherStats.getMsgDropRate() > 0.0d);
                Assert.assertTrue(nonPersistentSubscriptionStats.getMsgDropRate() > 0.0d);
                Assert.assertTrue(nonPersistentSubscriptionStats2.getMsgDropRate() > 0.0d);
                create.close();
                subscribe.close();
                subscribe2.close();
                if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                    newFixedThreadPool.shutdownNow();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                    newFixedThreadPool.shutdownNow();
                }
                throw th;
            }
        } finally {
            this.conf.setMaxConcurrentNonPersistentMessagePerConnection(maxConcurrentNonPersistentMessagePerConnection);
        }
    }

    private void rolloverPerIntervalStats(PulsarService pulsarService) {
        try {
            pulsarService.getExecutor().submit(() -> {
                pulsarService.getBrokerService().updateRates();
            }).get();
        } catch (Exception e) {
            log.error("Stats executor error", e);
        }
    }
}
