package org.apache.pulsar.websocket.proxy;

import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import com.google.gson.reflect.TypeToken;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.service.ProxyServer;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
import org.apache.pulsar.websocket.stats.ProxyTopicStat;
import org.awaitility.Awaitility;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.logging.LoggingFeature;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"websocket"})
/* loaded from: input_file:org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.class */
public class ProxyPublishConsumeTest extends ProducerConsumerBase {
    protected String methodName;
    private ProxyServer proxyServer;
    private WebSocketService service;
    private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
    private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        this.conf.setBacklogQuotaCheckIntervalInSeconds(5);
        super.internalSetup();
        super.producerBaseSetup();
        WebSocketProxyConfiguration webSocketProxyConfiguration = new WebSocketProxyConfiguration();
        webSocketProxyConfiguration.setWebServicePort(Optional.of(0));
        webSocketProxyConfiguration.setClusterName("test");
        webSocketProxyConfiguration.setConfigurationStoreServers("GLOBAL_DUMMY_VALUE");
        this.service = (WebSocketService) BrokerTestUtil.spyWithClassAndConstructorArgs(WebSocketService.class, webSocketProxyConfiguration);
        ((WebSocketService) Mockito.doReturn(new ZKMetadataStore(this.mockZooKeeperGlobal)).when(this.service)).createMetadataStore(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt());
        this.proxyServer = new ProxyServer(webSocketProxyConfiguration);
        WebSocketServiceStarter.start(this.proxyServer, this.service);
        log.info("Proxy Server Started");
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.resetConfig();
        super.internalCleanup();
        if (this.service != null) {
            this.service.close();
        }
        if (this.proxyServer != null) {
            this.proxyServer.stop();
        }
        log.info("Finished Cleaning Up Test setup");
    }

    @Test(timeOut = 10000)
    public void socketTest() throws Exception {
        int i;
        String str = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic1/my-sub1?subscriptionType=Failover";
        String str2 = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/reader/persistent/my-property/my-ns/my-topic1";
        String str3 = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/my-property/my-ns/my-topic1/";
        URI create = URI.create(str);
        URI create2 = URI.create(str2);
        URI create3 = URI.create(str3);
        WebSocketClient webSocketClient = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket = new SimpleConsumerSocket();
        WebSocketClient webSocketClient2 = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket2 = new SimpleConsumerSocket();
        WebSocketClient webSocketClient3 = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket3 = new SimpleConsumerSocket();
        WebSocketClient webSocketClient4 = new WebSocketClient();
        SimpleProducerSocket simpleProducerSocket = new SimpleProducerSocket();
        try {
            webSocketClient.start();
            webSocketClient2.start();
            ClientUpgradeRequest clientUpgradeRequest = new ClientUpgradeRequest();
            ClientUpgradeRequest clientUpgradeRequest2 = new ClientUpgradeRequest();
            Future connect = webSocketClient.connect(simpleConsumerSocket, create, clientUpgradeRequest);
            Future connect2 = webSocketClient2.connect(simpleConsumerSocket2, create, clientUpgradeRequest2);
            log.info("Connecting to : {}", create);
            webSocketClient3.start();
            Future connect3 = webSocketClient3.connect(simpleConsumerSocket3, create2, new ClientUpgradeRequest());
            log.info("Connecting to : {}", create2);
            Assert.assertTrue(((Session) connect.get()).isOpen());
            Assert.assertTrue(((Session) connect2.get()).isOpen());
            Assert.assertTrue(((Session) connect3.get()).isOpen());
            Thread.sleep(500L);
            ClientUpgradeRequest clientUpgradeRequest3 = new ClientUpgradeRequest();
            webSocketClient4.start();
            Assert.assertTrue(((Session) webSocketClient4.connect(simpleProducerSocket, create3, clientUpgradeRequest3).get()).isOpen());
            int i2 = 0;
            do {
                if ((simpleConsumerSocket.getReceivedMessagesCount() >= 10 || simpleConsumerSocket2.getReceivedMessagesCount() >= 10) && simpleConsumerSocket3.getReceivedMessagesCount() >= 10) {
                    Assert.assertTrue(((Session) connect.get()).isOpen());
                    Assert.assertTrue(((Session) connect2.get()).isOpen());
                    Assert.assertTrue(simpleProducerSocket.getBuffer().size() > 0);
                    if (simpleConsumerSocket.getBuffer().size() > simpleConsumerSocket2.getBuffer().size()) {
                        Assert.assertEquals(simpleProducerSocket.getBuffer(), simpleConsumerSocket.getBuffer());
                    } else {
                        Assert.assertEquals(simpleProducerSocket.getBuffer(), simpleConsumerSocket2.getBuffer());
                    }
                    Assert.assertEquals(simpleProducerSocket.getBuffer(), simpleConsumerSocket3.getBuffer());
                    stopWebSocketClient(webSocketClient, webSocketClient2, webSocketClient3, webSocketClient4);
                    return;
                }
                Thread.sleep(10L);
                i = i2;
                i2++;
            } while (i <= 400);
            String format = String.format("Consumer still has not received the message after %s ms", Integer.valueOf(400 * 10));
            log.warn(format);
            throw new IllegalStateException(format);
        } catch (Throwable th) {
            stopWebSocketClient(webSocketClient, webSocketClient2, webSocketClient3, webSocketClient4);
            throw th;
        }
    }

    @Test(timeOut = 10000)
    public void socketTestEndOfTopic() throws Exception {
        String format = String.format("ws://localhost:%d/ws/v2/consumer/persistent/%s/%s?pullMode=true&subscriptionType=Shared", this.proxyServer.getListenPortHTTP().get(), "my-property/my-ns/my-topic8", "my-sub");
        String format2 = String.format("ws://localhost:%d/ws/v2/producer/persistent/%s", this.proxyServer.getListenPortHTTP().get(), "my-property/my-ns/my-topic8");
        URI create = URI.create(format);
        URI create2 = URI.create(format2);
        WebSocketClient webSocketClient = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket = new SimpleConsumerSocket();
        WebSocketClient webSocketClient2 = new WebSocketClient();
        SimpleProducerSocket simpleProducerSocket = new SimpleProducerSocket();
        try {
            webSocketClient.start();
            Future connect = webSocketClient.connect(simpleConsumerSocket, create, new ClientUpgradeRequest());
            log.info("Connecting to : {}", create);
            Assert.assertTrue(((Session) connect.get()).isOpen());
            ClientUpgradeRequest clientUpgradeRequest = new ClientUpgradeRequest();
            webSocketClient2.start();
            Assert.assertTrue(((Session) webSocketClient2.connect(simpleProducerSocket, create2, clientUpgradeRequest).get()).isOpen());
            simpleProducerSocket.sendMessage(20);
            simpleConsumerSocket.sendPermits(10);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(simpleConsumerSocket.getReceivedMessagesCount(), 10);
            });
            simpleConsumerSocket.isEndOfTopic();
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(simpleConsumerSocket.getBuffer().size(), 11);
            });
            Assert.assertEquals(simpleConsumerSocket.getBuffer().get(simpleConsumerSocket.getBuffer().size() - 1), "{\"endOfTopic\":false}");
            simpleConsumerSocket.sendPermits(20);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(simpleConsumerSocket.getReceivedMessagesCount(), 31);
            });
            simpleConsumerSocket.isEndOfTopic();
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(simpleConsumerSocket.getReceivedMessagesCount(), 32);
            });
            Assert.assertEquals(simpleConsumerSocket.getBuffer().get(simpleConsumerSocket.getBuffer().size() - 1), "{\"endOfTopic\":false}");
            this.admin.topics().terminateTopicAsync("my-property/my-ns/my-topic8").get();
            simpleConsumerSocket.isEndOfTopic();
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(simpleConsumerSocket.getReceivedMessagesCount(), 33);
            });
            Assert.assertEquals(simpleConsumerSocket.getBuffer().get(simpleConsumerSocket.getBuffer().size() - 1), "{\"endOfTopic\":true}");
            stopWebSocketClient(webSocketClient, webSocketClient2);
        } catch (Throwable th) {
            stopWebSocketClient(webSocketClient, webSocketClient2);
            throw th;
        }
    }

    @Test
    public void unsubscribeTest() throws Exception {
        this.admin.topics().createPartitionedTopic(BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/my-topic7"), 3);
        URI create = URI.create("ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic7/my-sub");
        WebSocketClient webSocketClient = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket = new SimpleConsumerSocket();
        Thread.sleep(500L);
        try {
            webSocketClient.start();
            webSocketClient.connect(simpleConsumerSocket, create, new ClientUpgradeRequest()).get();
            List subscriptions = this.admin.topics().getSubscriptions("my-property/my-ns/my-topic7");
            Assert.assertEquals(subscriptions.size(), 1);
            Assert.assertEquals((String) subscriptions.get(0), "my-sub");
            simpleConsumerSocket.unsubscribe();
            Thread.sleep(1000L);
            Assert.assertEquals(this.admin.topics().getSubscriptions("my-property/my-ns/my-topic7").size(), 0);
            stopWebSocketClient(webSocketClient);
        } catch (Throwable th) {
            stopWebSocketClient(webSocketClient);
            throw th;
        }
    }

    @Test(timeOut = 10000)
    public void emptySubscriptionConsumerTest() {
        URI create = URI.create("ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic2/?subscriptionType=Exclusive");
        WebSocketClient webSocketClient = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket = new SimpleConsumerSocket();
        try {
            try {
                webSocketClient.start();
                webSocketClient.connect(simpleConsumerSocket, create, new ClientUpgradeRequest()).get();
                Assert.fail("should fail: empty subscription");
                stopWebSocketClient(webSocketClient);
            } catch (Exception e) {
                Assert.assertTrue(e.getCause() instanceof UpgradeException);
                Assert.assertEquals(e.getCause().getResponseStatusCode(), 400);
                stopWebSocketClient(webSocketClient);
            }
        } catch (Throwable th) {
            stopWebSocketClient(webSocketClient);
            throw th;
        }
    }

    @Test(timeOut = 10000)
    public void conflictingConsumerTest() throws Exception {
        URI create = URI.create("ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic3/sub1?subscriptionType=Exclusive");
        WebSocketClient webSocketClient = new WebSocketClient();
        WebSocketClient webSocketClient2 = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket = new SimpleConsumerSocket();
        SimpleConsumerSocket simpleConsumerSocket2 = new SimpleConsumerSocket();
        try {
            webSocketClient.start();
            webSocketClient.connect(simpleConsumerSocket, create, new ClientUpgradeRequest()).get();
            try {
                try {
                    webSocketClient2.start();
                    webSocketClient2.connect(simpleConsumerSocket2, create, new ClientUpgradeRequest()).get();
                    Assert.fail("should fail: conflicting subscription name");
                    stopWebSocketClient(webSocketClient2);
                } catch (Throwable th) {
                    stopWebSocketClient(webSocketClient2);
                    throw th;
                }
            } catch (Exception e) {
                Assert.assertTrue(e.getCause() instanceof UpgradeException);
                Assert.assertEquals(e.getCause().getResponseStatusCode(), 409);
                stopWebSocketClient(webSocketClient2);
            }
            stopWebSocketClient(webSocketClient);
        } catch (Throwable th2) {
            stopWebSocketClient(webSocketClient);
            throw th2;
        }
    }

    @Test(timeOut = 10000)
    public void conflictingProducerTest() throws Exception {
        URI create = URI.create("ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/my-property/my-ns/my-topic4?producerName=my-producer");
        WebSocketClient webSocketClient = new WebSocketClient();
        WebSocketClient webSocketClient2 = new WebSocketClient();
        SimpleProducerSocket simpleProducerSocket = new SimpleProducerSocket();
        SimpleProducerSocket simpleProducerSocket2 = new SimpleProducerSocket();
        try {
            webSocketClient.start();
            webSocketClient.connect(simpleProducerSocket, create, new ClientUpgradeRequest()).get();
            try {
                try {
                    webSocketClient2.start();
                    webSocketClient2.connect(simpleProducerSocket2, create, new ClientUpgradeRequest()).get();
                    Assert.fail("should fail: conflicting producer name");
                    stopWebSocketClient(webSocketClient2);
                } catch (Throwable th) {
                    stopWebSocketClient(webSocketClient2);
                    throw th;
                }
            } catch (Exception e) {
                Assert.assertTrue(e.getCause() instanceof UpgradeException);
                Assert.assertEquals(e.getCause().getResponseStatusCode(), 409);
                stopWebSocketClient(webSocketClient2);
            }
            stopWebSocketClient(webSocketClient);
        } catch (Throwable th2) {
            stopWebSocketClient(webSocketClient);
            throw th2;
        }
    }

    @Test
    public void producerBacklogQuotaExceededTest() throws Exception {
        this.admin.namespaces().createNamespace("my-property/ns-ws-quota");
        this.admin.namespaces().setNamespaceReplicationClusters("my-property/ns-ws-quota", Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setBacklogQuota("my-property/ns-ws-quota", BacklogQuota.builder().limitSize(10L).retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold).build());
        String str = "my-property/ns-ws-quota/my-topic5";
        String str2 = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + str + "/my-sub";
        String str3 = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/" + str;
        URI create = URI.create(str2);
        URI create2 = URI.create(str3);
        WebSocketClient webSocketClient = new WebSocketClient();
        WebSocketClient webSocketClient2 = new WebSocketClient();
        WebSocketClient webSocketClient3 = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket = new SimpleConsumerSocket();
        SimpleProducerSocket simpleProducerSocket = new SimpleProducerSocket();
        SimpleProducerSocket simpleProducerSocket2 = new SimpleProducerSocket();
        try {
            webSocketClient.start();
            webSocketClient.connect(simpleConsumerSocket, create, new ClientUpgradeRequest()).get();
            stopWebSocketClient(webSocketClient);
            try {
                webSocketClient2.start();
                webSocketClient2.connect(simpleProducerSocket, create2, new ClientUpgradeRequest()).get();
                simpleProducerSocket.sendMessage(100);
                stopWebSocketClient(webSocketClient2);
                Thread.sleep(6000L);
                try {
                    try {
                        webSocketClient3.start();
                        webSocketClient3.connect(simpleProducerSocket2, create2, new ClientUpgradeRequest()).get();
                        Assert.fail("should fail: backlog quota exceeded");
                        stopWebSocketClient(webSocketClient3);
                        this.admin.topics().skipAllMessages("persistent://" + str, "my-sub");
                        this.admin.topics().delete("persistent://" + str);
                        this.admin.namespaces().removeBacklogQuota("my-property/ns-ws-quota");
                        this.admin.namespaces().deleteNamespace("my-property/ns-ws-quota");
                    } catch (Exception e) {
                        Assert.assertTrue(e.getCause() instanceof UpgradeException);
                        Assert.assertEquals(e.getCause().getResponseStatusCode(), 503);
                        stopWebSocketClient(webSocketClient3);
                        this.admin.topics().skipAllMessages("persistent://" + str, "my-sub");
                        this.admin.topics().delete("persistent://" + str);
                        this.admin.namespaces().removeBacklogQuota("my-property/ns-ws-quota");
                        this.admin.namespaces().deleteNamespace("my-property/ns-ws-quota");
                    }
                } catch (Throwable th) {
                    stopWebSocketClient(webSocketClient3);
                    this.admin.topics().skipAllMessages("persistent://" + str, "my-sub");
                    this.admin.topics().delete("persistent://" + str);
                    this.admin.namespaces().removeBacklogQuota("my-property/ns-ws-quota");
                    this.admin.namespaces().deleteNamespace("my-property/ns-ws-quota");
                    throw th;
                }
            } catch (Throwable th2) {
                stopWebSocketClient(webSocketClient2);
                throw th2;
            }
        } catch (Throwable th3) {
            stopWebSocketClient(webSocketClient);
            throw th3;
        }
    }

    @Test(timeOut = 10000)
    public void topicDoesNotExistTest() throws Exception {
        this.admin.namespaces().createNamespace("my-property/ns-topic-creation-not-allowed");
        this.admin.namespaces().setNamespaceReplicationClusters("my-property/ns-topic-creation-not-allowed", Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setAutoTopicCreation("my-property/ns-topic-creation-not-allowed", AutoTopicCreationOverride.builder().allowAutoTopicCreation(false).topicType(TopicType.NON_PARTITIONED.toString()).build());
        String str = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/my-property/ns-topic-creation-not-allowed/my-topic";
        String str2 = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/my-property/ns-topic-creation-not-allowed/my-topic/my-sub";
        URI create = URI.create(str);
        URI create2 = URI.create(str2);
        WebSocketClient webSocketClient = new WebSocketClient();
        WebSocketClient webSocketClient2 = new WebSocketClient();
        SimpleProducerSocket simpleProducerSocket = new SimpleProducerSocket();
        SimpleConsumerSocket simpleConsumerSocket = new SimpleConsumerSocket();
        try {
            try {
                webSocketClient.start();
                webSocketClient.connect(simpleProducerSocket, create, new ClientUpgradeRequest()).get();
                Assert.fail("should fail: topic does not exist");
                stopWebSocketClient(webSocketClient);
            } catch (Throwable th) {
                stopWebSocketClient(webSocketClient);
                throw th;
            }
        } catch (Exception e) {
            Assert.assertTrue(e.getCause() instanceof UpgradeException);
            Assert.assertEquals(e.getCause().getResponseStatusCode(), 404);
            stopWebSocketClient(webSocketClient);
        }
        try {
            try {
                webSocketClient2.start();
                webSocketClient2.connect(simpleConsumerSocket, create2, new ClientUpgradeRequest()).get();
                Assert.fail("should fail: topic does not exist");
                stopWebSocketClient(webSocketClient2);
            } catch (Exception e2) {
                Assert.assertTrue(e2.getCause() instanceof UpgradeException);
                Assert.assertEquals(e2.getCause().getResponseStatusCode(), 404);
                stopWebSocketClient(webSocketClient2);
            }
            this.admin.namespaces().deleteNamespace("my-property/ns-topic-creation-not-allowed");
        } catch (Throwable th2) {
            stopWebSocketClient(webSocketClient2);
            throw th2;
        }
    }

    @Test(timeOut = 10000)
    public void producerFencedTest() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/producer-fenced-test").accessMode(ProducerAccessMode.Exclusive).create();
        URI create2 = URI.create("ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/my-property/my-ns/producer-fenced-test");
        WebSocketClient webSocketClient = new WebSocketClient();
        SimpleProducerSocket simpleProducerSocket = new SimpleProducerSocket();
        try {
            try {
                webSocketClient.start();
                webSocketClient.connect(simpleProducerSocket, create2, new ClientUpgradeRequest()).get();
                Assert.fail("should fail: producer fenced");
                stopWebSocketClient(webSocketClient);
                create.close();
            } catch (Exception e) {
                Assert.assertTrue(e.getCause() instanceof UpgradeException);
                Assert.assertEquals(e.getCause().getResponseStatusCode(), 409);
                stopWebSocketClient(webSocketClient);
                create.close();
            }
        } catch (Throwable th) {
            stopWebSocketClient(webSocketClient);
            create.close();
            throw th;
        }
    }

    @Test(timeOut = 10000)
    public void topicTerminatedTest() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://my-property/my-ns/topic-terminated-test");
        this.admin.topics().terminateTopic("persistent://my-property/my-ns/topic-terminated-test");
        URI create = URI.create("ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/my-property/my-ns/topic-terminated-test");
        WebSocketClient webSocketClient = new WebSocketClient();
        SimpleProducerSocket simpleProducerSocket = new SimpleProducerSocket();
        try {
            try {
                webSocketClient.start();
                webSocketClient.connect(simpleProducerSocket, create, new ClientUpgradeRequest()).get();
                Assert.fail("should fail: topic terminated");
                stopWebSocketClient(webSocketClient);
                this.admin.topics().delete("persistent://my-property/my-ns/topic-terminated-test");
            } catch (Exception e) {
                Assert.assertTrue(e.getCause() instanceof UpgradeException);
                Assert.assertEquals(e.getCause().getResponseStatusCode(), 503);
                stopWebSocketClient(webSocketClient);
                this.admin.topics().delete("persistent://my-property/my-ns/topic-terminated-test");
            }
        } catch (Throwable th) {
            stopWebSocketClient(webSocketClient);
            this.admin.topics().delete("persistent://my-property/my-ns/topic-terminated-test");
            throw th;
        }
    }

    @Test(timeOut = 10000)
    public void testProxyStats() throws Exception {
        String str = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic6/my-sub?subscriptionType=Failover";
        String str2 = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/my-property/my-ns/my-topic6/";
        String str3 = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/reader/persistent/my-property/my-ns/my-topic6";
        System.out.println(str + ", " + str2);
        URI create = URI.create(str);
        URI create2 = URI.create(str2);
        URI create3 = URI.create(str3);
        WebSocketClient webSocketClient = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket = new SimpleConsumerSocket();
        WebSocketClient webSocketClient2 = new WebSocketClient();
        SimpleProducerSocket simpleProducerSocket = new SimpleProducerSocket();
        WebSocketClient webSocketClient3 = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket2 = new SimpleConsumerSocket();
        try {
            webSocketClient.start();
            Future connect = webSocketClient.connect(simpleConsumerSocket, create, new ClientUpgradeRequest());
            log.info("Connecting to : {}", create);
            webSocketClient3.start();
            Future connect2 = webSocketClient3.connect(simpleConsumerSocket2, create3, new ClientUpgradeRequest());
            log.info("Connecting to : {}", create3);
            Assert.assertTrue(((Session) connect.get()).isOpen());
            Assert.assertTrue(((Session) connect2.get()).isOpen());
            Thread.sleep(500L);
            ClientUpgradeRequest clientUpgradeRequest = new ClientUpgradeRequest();
            webSocketClient2.start();
            Assert.assertTrue(((Session) webSocketClient2.connect(simpleProducerSocket, create2, clientUpgradeRequest).get()).isOpen());
            int i = 0;
            while (true) {
                if (simpleConsumerSocket.getReceivedMessagesCount() >= 2) {
                    break;
                }
                Thread.sleep(10L);
                int i2 = i;
                i++;
                if (i2 > 400) {
                    log.warn(String.format("Consumer still has not received the message after %s ms", Integer.valueOf(400 * 10)));
                    break;
                }
            }
            Client newClient = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class));
            String str4 = this.pulsar.getSafeWebServiceAddress().replace(Integer.toString(((Integer) this.pulsar.getConfiguration().getWebServicePort().get()).intValue()), Integer.toString(((Integer) this.proxyServer.getListenPortHTTP().get()).intValue())) + "/admin/v2/proxy-stats/";
            verifyProxyMetrics(newClient, str4);
            verifyProxyStats(newClient, str4, "my-property/my-ns/my-topic6");
            verifyTopicStat(newClient, str4 + "persistent/", "my-property/my-ns/my-topic6");
            stopWebSocketClient(webSocketClient, webSocketClient2, webSocketClient3);
        } catch (Throwable th) {
            stopWebSocketClient(webSocketClient, webSocketClient2, webSocketClient3);
            throw th;
        }
    }

    @Test(timeOut = 10000)
    public void consumeMessagesInPartitionedTopicTest() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/my-topic7", 3);
        String str = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic7/my-sub";
        String str2 = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/my-property/my-ns/my-topic7";
        URI create = URI.create(str);
        URI create2 = URI.create(str2);
        WebSocketClient webSocketClient = new WebSocketClient();
        WebSocketClient webSocketClient2 = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket = new SimpleConsumerSocket();
        SimpleProducerSocket simpleProducerSocket = new SimpleProducerSocket();
        try {
            webSocketClient2.start();
            webSocketClient2.connect(simpleProducerSocket, create2, new ClientUpgradeRequest()).get();
            simpleProducerSocket.sendMessage(100);
            stopWebSocketClient(webSocketClient2);
            Thread.sleep(500L);
            try {
                webSocketClient.start();
                webSocketClient.connect(simpleConsumerSocket, create, new ClientUpgradeRequest()).get();
                stopWebSocketClient(webSocketClient);
            } catch (Throwable th) {
                stopWebSocketClient(webSocketClient);
                throw th;
            }
        } catch (Throwable th2) {
            stopWebSocketClient(webSocketClient2);
            throw th2;
        }
    }

    @Test(timeOut = 10000)
    public void socketPullModeTest() throws Exception {
        String format = String.format("ws://localhost:%d/ws/v2/consumer/persistent/%s/%s?pullMode=true&subscriptionType=Shared", this.proxyServer.getListenPortHTTP().get(), "my-property/my-ns/my-topic8", "my-sub");
        String format2 = String.format("ws://localhost:%d/ws/v2/producer/persistent/%s", this.proxyServer.getListenPortHTTP().get(), "my-property/my-ns/my-topic8");
        URI create = URI.create(format);
        URI create2 = URI.create(format2);
        WebSocketClient webSocketClient = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket = new SimpleConsumerSocket();
        WebSocketClient webSocketClient2 = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket2 = new SimpleConsumerSocket();
        WebSocketClient webSocketClient3 = new WebSocketClient();
        SimpleProducerSocket simpleProducerSocket = new SimpleProducerSocket();
        try {
            webSocketClient.start();
            webSocketClient2.start();
            ClientUpgradeRequest clientUpgradeRequest = new ClientUpgradeRequest();
            ClientUpgradeRequest clientUpgradeRequest2 = new ClientUpgradeRequest();
            Future connect = webSocketClient.connect(simpleConsumerSocket, create, clientUpgradeRequest);
            Future connect2 = webSocketClient2.connect(simpleConsumerSocket2, create, clientUpgradeRequest2);
            log.info("Connecting to : {}", create);
            Assert.assertTrue(((Session) connect.get()).isOpen());
            Assert.assertTrue(((Session) connect2.get()).isOpen());
            ClientUpgradeRequest clientUpgradeRequest3 = new ClientUpgradeRequest();
            webSocketClient3.start();
            Assert.assertTrue(((Session) webSocketClient3.connect(simpleProducerSocket, create2, clientUpgradeRequest3).get()).isOpen());
            simpleProducerSocket.sendMessage(100);
            Thread.sleep(500L);
            Assert.assertEquals(simpleConsumerSocket.getReceivedMessagesCount(), 0);
            Assert.assertEquals(simpleConsumerSocket2.getReceivedMessagesCount(), 0);
            simpleConsumerSocket.sendPermits(3);
            simpleConsumerSocket2.sendPermits(2);
            simpleConsumerSocket2.sendPermits(2);
            simpleConsumerSocket2.sendPermits(2);
            Thread.sleep(500L);
            Assert.assertEquals(simpleConsumerSocket.getReceivedMessagesCount(), 3);
            Assert.assertEquals(simpleConsumerSocket2.getReceivedMessagesCount(), 6);
            stopWebSocketClient(webSocketClient, webSocketClient2, webSocketClient3);
        } catch (Throwable th) {
            stopWebSocketClient(webSocketClient, webSocketClient2, webSocketClient3);
            throw th;
        }
    }

    @Test(timeOut = 20000)
    public void nackMessageTest() throws Exception {
        String str = "my-property/my-ns/nack-msg-dlq-" + UUID.randomUUID();
        String str2 = "my-property/my-ns/nack-msg-" + UUID.randomUUID();
        String str3 = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + str + "/my-sub?subscriptionType=Shared";
        String str4 = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + str2 + "/my-sub?deadLetterTopic=" + str + "&maxRedeliverCount=1&subscriptionType=Shared&negativeAckRedeliveryDelay=1000";
        String str5 = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/" + str2;
        WebSocketClient webSocketClient = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket = new SimpleConsumerSocket();
        WebSocketClient webSocketClient2 = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket2 = new SimpleConsumerSocket();
        WebSocketClient webSocketClient3 = new WebSocketClient();
        SimpleProducerSocket simpleProducerSocket = new SimpleProducerSocket(0);
        simpleConsumerSocket.setMessageHandler((str6, jsonObject) -> {
            JsonObject jsonObject = new JsonObject();
            jsonObject.add("messageId", new JsonPrimitive(str6));
            jsonObject.add("type", new JsonPrimitive("negativeAcknowledge"));
            return jsonObject.toString();
        });
        try {
            webSocketClient.start();
            webSocketClient2.start();
            ClientUpgradeRequest clientUpgradeRequest = new ClientUpgradeRequest();
            ClientUpgradeRequest clientUpgradeRequest2 = new ClientUpgradeRequest();
            Future connect = webSocketClient.connect(simpleConsumerSocket, URI.create(str4), clientUpgradeRequest);
            Future connect2 = webSocketClient2.connect(simpleConsumerSocket2, URI.create(str3), clientUpgradeRequest2);
            Assert.assertTrue(((Session) connect.get()).isOpen());
            Assert.assertTrue(((Session) connect2.get()).isOpen());
            ClientUpgradeRequest clientUpgradeRequest3 = new ClientUpgradeRequest();
            webSocketClient3.start();
            Assert.assertTrue(((Session) webSocketClient3.connect(simpleProducerSocket, URI.create(str5), clientUpgradeRequest3).get()).isOpen());
            Assert.assertEquals(simpleConsumerSocket.getReceivedMessagesCount(), 0);
            Assert.assertEquals(simpleConsumerSocket2.getReceivedMessagesCount(), 0);
            simpleProducerSocket.sendMessage(1);
            Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
                Assert.assertEquals(simpleConsumerSocket.getReceivedMessagesCount(), 2);
            });
            Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
                Assert.assertEquals(simpleConsumerSocket2.getReceivedMessagesCount(), 1);
            });
            stopWebSocketClient(webSocketClient, webSocketClient2, webSocketClient3);
        } catch (Throwable th) {
            stopWebSocketClient(webSocketClient, webSocketClient2, webSocketClient3);
            throw th;
        }
    }

    @Test(timeOut = 20000)
    public void nackRedeliveryDelayTest() throws Exception {
        String str = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2";
        String str2 = "my-property/my-ns/nack-redelivery-delay-" + UUID.randomUUID();
        String format = String.format("%s/consumer/persistent/%s/%s?negativeAckRedeliveryDelay=%d", str, str2, "my-sub", 5000);
        String format2 = String.format("%s/producer/persistent/%s", str, str2);
        WebSocketClient webSocketClient = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket = new SimpleConsumerSocket();
        WebSocketClient webSocketClient2 = new WebSocketClient();
        SimpleProducerSocket simpleProducerSocket = new SimpleProducerSocket(0);
        simpleConsumerSocket.setMessageHandler((str3, jsonObject) -> {
            JsonObject jsonObject = new JsonObject();
            jsonObject.add("type", new JsonPrimitive("negativeAcknowledge"));
            jsonObject.add("messageId", new JsonPrimitive(str3));
            return jsonObject.toString();
        });
        try {
            webSocketClient.start();
            Assert.assertTrue(((Session) webSocketClient.connect(simpleConsumerSocket, URI.create(format), new ClientUpgradeRequest()).get()).isOpen());
            webSocketClient2.start();
            Assert.assertTrue(((Session) webSocketClient2.connect(simpleProducerSocket, URI.create(format2), new ClientUpgradeRequest()).get()).isOpen());
            Assert.assertEquals(simpleConsumerSocket.getReceivedMessagesCount(), 0);
            simpleProducerSocket.sendMessage(1);
            Awaitility.await().atMost(4000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assert.assertEquals(simpleConsumerSocket.getReceivedMessagesCount(), 1);
            });
            Thread.sleep(5000L);
            Awaitility.await().atMost(4000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assert.assertEquals(simpleConsumerSocket.getReceivedMessagesCount(), 2);
            });
            stopWebSocketClient(webSocketClient, webSocketClient2);
        } catch (Throwable th) {
            stopWebSocketClient(webSocketClient, webSocketClient2);
            throw th;
        }
    }

    @Test(timeOut = 20000)
    public void ackBatchMessageTest() throws Exception {
        String str = "my-property/my-ns/ack-batch-message" + UUID.randomUUID();
        String str2 = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + str + "/my-sub";
        WebSocketClient webSocketClient = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket = new SimpleConsumerSocket();
        Producer create = this.pulsarClient.newProducer().topic(str).batchingMaxPublishDelay(1L, TimeUnit.SECONDS).create();
        try {
            webSocketClient.start();
            Assert.assertTrue(((Session) webSocketClient.connect(simpleConsumerSocket, URI.create(str2), new ClientUpgradeRequest()).get()).isOpen());
            Assert.assertEquals(simpleConsumerSocket.getReceivedMessagesCount(), 0);
            for (int i = 0; i < 10; i++) {
                create.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
            }
            create.flush();
            simpleConsumerSocket.sendPermits(10);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(simpleConsumerSocket.getReceivedMessagesCount(), 10);
            });
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("my-sub")).getMsgBacklog(), 0L);
            });
            stopWebSocketClient(webSocketClient);
        } catch (Throwable th) {
            stopWebSocketClient(webSocketClient);
            throw th;
        }
    }

    @Test(timeOut = 20000)
    public void consumeEncryptedMessages() throws Exception {
        String str = "my-property/my-ns/encrypted" + UUID.randomUUID();
        String str2 = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + str + "/my-sub?cryptoFailureAction=CONSUME";
        WebSocketClient webSocketClient = new WebSocketClient();
        SimpleConsumerSocket simpleConsumerSocket = new SimpleConsumerSocket();
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).defaultCryptoKeyReader("data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==").addEncryptionKey("ws-consumer-a").create();
        try {
            webSocketClient.start();
            Assert.assertTrue(((Session) webSocketClient.connect(simpleConsumerSocket, URI.create(str2), new ClientUpgradeRequest()).get()).isOpen());
            Assert.assertEquals(simpleConsumerSocket.getReceivedMessagesCount(), 0);
            for (int i = 0; i < 10; i++) {
                create.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
            }
            create.flush();
            simpleConsumerSocket.sendPermits(10);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(simpleConsumerSocket.getReceivedMessagesCount(), 10);
            });
            Iterator<JsonObject> it = simpleConsumerSocket.messages.iterator();
            while (it.hasNext()) {
                JsonObject next = it.next();
                Assert.assertTrue(next.has("encryptionContext"));
                JsonObject asJsonObject = next.getAsJsonObject("encryptionContext").getAsJsonObject("keys");
                Assert.assertTrue(asJsonObject.has("ws-consumer-a"));
                Assert.assertTrue(asJsonObject.getAsJsonObject("ws-consumer-a").has("keyValue"));
            }
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("my-sub")).getMsgBacklog(), 0L);
            });
            stopWebSocketClient(webSocketClient);
        } catch (Throwable th) {
            stopWebSocketClient(webSocketClient);
            throw th;
        }
    }

    private void verifyTopicStat(Client client, String str, String str2) {
        ProxyTopicStat proxyTopicStat = (ProxyTopicStat) new Gson().fromJson((String) client.target(str + str2 + "/stats").request(new String[]{"application/json"}).get().readEntity(String.class), ProxyTopicStat.class);
        Assert.assertFalse(proxyTopicStat.producerStats.isEmpty());
        Assert.assertFalse(proxyTopicStat.consumerStats.isEmpty());
    }

    /* JADX WARN: Type inference failed for: r2v1, types: [org.apache.pulsar.websocket.proxy.ProxyPublishConsumeTest$1] */
    /* JADX WARN: Type inference failed for: r2v4, types: [org.apache.pulsar.websocket.proxy.ProxyPublishConsumeTest$2] */
    private void verifyProxyMetrics(Client client, String str) {
        this.service.getProxyStats().generate();
        WebTarget target = client.target(str + "metrics");
        String str2 = (String) target.request(new String[]{"application/json"}).get().readEntity(String.class);
        Gson gson = new Gson();
        Assert.assertFalse(((List) gson.fromJson(str2, new TypeToken<List<Metrics>>() { // from class: org.apache.pulsar.websocket.proxy.ProxyPublishConsumeTest.1
        }.getType())).isEmpty());
        this.service.getProxyStats().generate();
        Assert.assertFalse(((List) gson.fromJson((String) target.request(new String[]{"application/json"}).get().readEntity(String.class), new TypeToken<List<Metrics>>() { // from class: org.apache.pulsar.websocket.proxy.ProxyPublishConsumeTest.2
        }.getType())).isEmpty());
    }

    /* JADX WARN: Type inference failed for: r2v1, types: [org.apache.pulsar.websocket.proxy.ProxyPublishConsumeTest$3] */
    private void verifyProxyStats(Client client, String str, String str2) {
        Map map = (Map) new Gson().fromJson((String) client.target(str + "stats").request(new String[]{"application/json"}).get().readEntity(String.class), new TypeToken<Map<String, ProxyTopicStat>>() { // from class: org.apache.pulsar.websocket.proxy.ProxyPublishConsumeTest.3
        }.getType());
        Assert.assertEquals(map.size(), 1);
        Map.Entry entry = (Map.Entry) map.entrySet().iterator().next();
        Assert.assertEquals((String) entry.getKey(), "persistent://" + str2);
        ProxyTopicStat proxyTopicStat = (ProxyTopicStat) entry.getValue();
        Assert.assertEquals(proxyTopicStat.consumerStats.size(), 2);
        ProxyTopicStat.ConsumerStats consumerStats = (ProxyTopicStat.ConsumerStats) proxyTopicStat.consumerStats.iterator().next();
        Assert.assertTrue(consumerStats.numberOfMsgDelivered > 0);
        Assert.assertNotNull(consumerStats.remoteConnection);
        Assert.assertEquals(proxyTopicStat.producerStats.size(), 1);
        ProxyTopicStat.ProducerStats producerStats = (ProxyTopicStat.ProducerStats) proxyTopicStat.producerStats.iterator().next();
        Assert.assertTrue(producerStats.numberOfMsgPublished > 0);
        Assert.assertNotNull(producerStats.remoteConnection);
    }

    private void stopWebSocketClient(WebSocketClient... webSocketClientArr) {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        try {
            try {
                newFixedThreadPool.submit(() -> {
                    for (WebSocketClient webSocketClient : webSocketClientArr) {
                        try {
                            webSocketClient.stop();
                        } catch (Exception e) {
                            log.error(e.getMessage());
                        }
                    }
                    log.info("proxy clients are stopped successfully");
                }).get(2L, TimeUnit.SECONDS);
            } catch (Exception e) {
                log.error("failed to close proxy clients", e);
            }
        } finally {
            if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                newFixedThreadPool.shutdownNow();
            }
        }
    }
}
