package org.apache.kafka.server;

import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.PushTelemetryRequest;
import org.apache.kafka.common.requests.PushTelemetryResponse;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.server.ClientMetricsManager;
import org.apache.kafka.server.metrics.ClientMetricsInstance;
import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin;
import org.apache.kafka.server.metrics.ClientMetricsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/server/ClientMetricsManagerTest.class */
public class ClientMetricsManagerTest {
    private static final Logger LOG = LoggerFactory.getLogger(ClientMetricsManagerTest.class);
    private MockTime time;
    private Metrics kafkaMetrics;
    private ClientMetricsReceiverPlugin clientMetricsReceiverPlugin;
    private ClientMetricsManager clientMetricsManager;

    @AfterAll
    public static void ensureNoThreadLeak() throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return Thread.getAllStackTraces().keySet().stream().map((v0) -> {
                return v0.getName();
            }).noneMatch(str -> {
                return str.contains("client-metrics-reaper") || str.contains("executor-");
            });
        }, "Thread leak detected");
    }

    @BeforeEach
    public void setUp() {
        this.time = new MockTime();
        this.kafkaMetrics = new Metrics();
        this.clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin();
        this.clientMetricsManager = new ClientMetricsManager(this.clientMetricsReceiverPlugin, 100, this.time, 100, this.kafkaMetrics);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.clientMetricsManager.close();
        this.kafkaMetrics.close();
    }

    @Test
    public void testUpdateSubscription() throws Exception {
        Assertions.assertTrue(this.clientMetricsManager.subscriptions().isEmpty());
        Assertions.assertEquals(0, this.clientMetricsManager.subscriptionUpdateVersion());
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals(1, this.clientMetricsManager.subscriptions().size());
        Assertions.assertNotNull(this.clientMetricsManager.subscriptionInfo("sub-1"));
        ClientMetricsManager.SubscriptionInfo subscriptionInfo = this.clientMetricsManager.subscriptionInfo("sub-1");
        Set metrics = subscriptionInfo.metrics();
        Assertions.assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, metrics.size());
        Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(str -> {
            Assertions.assertTrue(metrics.contains(str));
        });
        Assertions.assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty("interval.ms"), String.valueOf(subscriptionInfo.intervalMs()));
        Assertions.assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(), subscriptionInfo.matchPattern().size());
        ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(str2 -> {
            String[] split = str2.split("=");
            Assertions.assertTrue(subscriptionInfo.matchPattern().containsKey(split[0]));
            Assertions.assertEquals(split[1], ((Pattern) subscriptionInfo.matchPattern().get(split[0])).pattern());
        });
        Assertions.assertEquals(1, this.clientMetricsManager.subscriptionUpdateVersion());
        Assertions.assertEquals(4, this.kafkaMetrics.metrics().size());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("instance-count").metricValue());
    }

    @Test
    public void testUpdateSubscriptionWithEmptyProperties() {
        Assertions.assertTrue(this.clientMetricsManager.subscriptions().isEmpty());
        Assertions.assertEquals(0, this.clientMetricsManager.subscriptionUpdateVersion());
        this.clientMetricsManager.updateSubscription("sub-1", new Properties());
        Assertions.assertEquals(0, this.clientMetricsManager.subscriptions().size());
        Assertions.assertEquals(0, this.clientMetricsManager.subscriptionUpdateVersion());
    }

    @Test
    public void testUpdateSubscriptionWithNullProperties() {
        Assertions.assertTrue(this.clientMetricsManager.subscriptions().isEmpty());
        Assertions.assertEquals(0, this.clientMetricsManager.subscriptionUpdateVersion());
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.clientMetricsManager.updateSubscription("sub-1", (Properties) null);
        });
        Assertions.assertEquals(0, this.clientMetricsManager.subscriptions().size());
        Assertions.assertEquals(0, this.clientMetricsManager.subscriptionUpdateVersion());
    }

    @Test
    public void testUpdateSubscriptionWithInvalidMetricsProperties() {
        Assertions.assertTrue(this.clientMetricsManager.subscriptions().isEmpty());
        Properties properties = new Properties();
        properties.put("random", "random");
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            this.clientMetricsManager.updateSubscription("sub-1", properties);
        });
    }

    @Test
    public void testUpdateSubscriptionWithPropertiesDeletion() {
        Assertions.assertTrue(this.clientMetricsManager.subscriptions().isEmpty());
        Assertions.assertEquals(0, this.clientMetricsManager.subscriptionUpdateVersion());
        Properties properties = new Properties();
        properties.put("interval.ms", "100");
        this.clientMetricsManager.updateSubscription("sub-1", properties);
        Assertions.assertEquals(1, this.clientMetricsManager.subscriptions().size());
        Assertions.assertNotNull(this.clientMetricsManager.subscriptionInfo("sub-1"));
        Assertions.assertEquals(1, this.clientMetricsManager.subscriptionUpdateVersion());
        this.clientMetricsManager.updateSubscription("sub-1", new Properties());
        Assertions.assertEquals(0, this.clientMetricsManager.subscriptions().size());
        Assertions.assertEquals(2, this.clientMetricsManager.subscriptionUpdateVersion());
    }

    @Test
    public void testGetTelemetry() throws Exception {
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals(1, this.clientMetricsManager.subscriptions().size());
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        Assertions.assertNotNull(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
        Assertions.assertTrue(processGetTelemetrySubscriptionRequest.data().subscriptionId() != 0);
        Assertions.assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, processGetTelemetrySubscriptionRequest.data().requestedMetrics().size());
        Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(str -> {
            Assertions.assertTrue(processGetTelemetrySubscriptionRequest.data().requestedMetrics().contains(str));
        });
        Assertions.assertEquals(4, processGetTelemetrySubscriptionRequest.data().acceptedCompressionTypes().size());
        Assertions.assertEquals(CompressionType.ZSTD.id, (Byte) processGetTelemetrySubscriptionRequest.data().acceptedCompressionTypes().get(0));
        Assertions.assertEquals(CompressionType.LZ4.id, (Byte) processGetTelemetrySubscriptionRequest.data().acceptedCompressionTypes().get(1));
        Assertions.assertEquals(CompressionType.GZIP.id, (Byte) processGetTelemetrySubscriptionRequest.data().acceptedCompressionTypes().get(2));
        Assertions.assertEquals(CompressionType.SNAPPY.id, (Byte) processGetTelemetrySubscriptionRequest.data().acceptedCompressionTypes().get(3));
        Assertions.assertEquals(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS, processGetTelemetrySubscriptionRequest.data().pushIntervalMs());
        Assertions.assertTrue(processGetTelemetrySubscriptionRequest.data().deltaTemporality());
        Assertions.assertEquals(100, processGetTelemetrySubscriptionRequest.data().telemetryMaxBytes());
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest.error());
        ClientMetricsInstance clientInstance = this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
        Assertions.assertNotNull(clientInstance);
        Assertions.assertEquals(Errors.NONE, clientInstance.lastKnownError());
        Assertions.assertEquals(12, this.kafkaMetrics.metrics().size());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("instance-count").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("unknown-subscription-request-count").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("unknown-subscription-request-rate").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("throttle-count").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("throttle-rate").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("plugin-export-count").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("plugin-export-rate").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("plugin-error-count").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("plugin-error-rate").metricValue());
        Assertions.assertEquals(Double.valueOf(Double.NaN), getMetric("plugin-export-time-avg").metricValue());
        Assertions.assertEquals(Double.valueOf(Double.NaN), getMetric("plugin-export-time-max").metricValue());
    }

    @Test
    public void testGetTelemetryWithoutSubscription() throws UnknownHostException {
        Assertions.assertTrue(this.clientMetricsManager.subscriptions().isEmpty());
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        Assertions.assertNotNull(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
        Assertions.assertTrue(processGetTelemetrySubscriptionRequest.data().subscriptionId() != 0);
        Assertions.assertTrue(processGetTelemetrySubscriptionRequest.data().requestedMetrics().isEmpty());
        Assertions.assertEquals(4, processGetTelemetrySubscriptionRequest.data().acceptedCompressionTypes().size());
        Assertions.assertEquals(300000, processGetTelemetrySubscriptionRequest.data().pushIntervalMs());
        Assertions.assertTrue(processGetTelemetrySubscriptionRequest.data().deltaTemporality());
        Assertions.assertEquals(100, processGetTelemetrySubscriptionRequest.data().telemetryMaxBytes());
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest.error());
        ClientMetricsInstance clientInstance = this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
        Assertions.assertNotNull(clientInstance);
        Assertions.assertEquals(Errors.NONE, clientInstance.lastKnownError());
    }

    @Test
    public void testGetTelemetryAfterPushIntervalTime() throws UnknownHostException {
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        Assertions.assertNotNull(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest.error());
        this.time.sleep(300000L);
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest2 = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()), true).build(), ClientMetricsTestUtils.requestContext());
        Assertions.assertNotNull(processGetTelemetrySubscriptionRequest2.data().clientInstanceId());
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest2.error());
    }

    @Test
    public void testGetTelemetryAllMetricSubscribedSubscription() throws UnknownHostException {
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Properties properties = new Properties();
        properties.put("metrics", "*");
        this.clientMetricsManager.updateSubscription("sub-2", properties);
        Assertions.assertEquals(2, this.clientMetricsManager.subscriptions().size());
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        Assertions.assertNotNull(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
        Assertions.assertTrue(processGetTelemetrySubscriptionRequest.data().subscriptionId() != 0);
        Assertions.assertEquals(1, processGetTelemetrySubscriptionRequest.data().requestedMetrics().size());
        Assertions.assertTrue(processGetTelemetrySubscriptionRequest.data().requestedMetrics().contains("*"));
        Assertions.assertEquals(4, processGetTelemetrySubscriptionRequest.data().acceptedCompressionTypes().size());
        Assertions.assertEquals(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS, processGetTelemetrySubscriptionRequest.data().pushIntervalMs());
        Assertions.assertTrue(processGetTelemetrySubscriptionRequest.data().deltaTemporality());
        Assertions.assertEquals(100, processGetTelemetrySubscriptionRequest.data().telemetryMaxBytes());
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest.error());
        ClientMetricsInstance clientInstance = this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
        Assertions.assertNotNull(clientInstance);
        Assertions.assertEquals(Errors.NONE, clientInstance.lastKnownError());
    }

    @Test
    public void testGetTelemetrySameClientImmediateRetryFail() throws Exception {
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        Uuid clientInstanceId = processGetTelemetrySubscriptionRequest.data().clientInstanceId();
        Assertions.assertNotNull(clientInstanceId);
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest.error());
        Assertions.assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true).build(), ClientMetricsTestUtils.requestContext()).error());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("instance-count").metricValue());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("throttle-count").metricValue());
        Assertions.assertTrue(((Double) getMetric("throttle-rate").metricValue()).doubleValue() > 0.0d);
    }

    @Test
    public void testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Exception {
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        Uuid clientInstanceId = processGetTelemetrySubscriptionRequest.data().clientInstanceId();
        Assertions.assertNotNull(clientInstanceId);
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest.error());
        Metrics metrics = new Metrics();
        try {
            ClientMetricsManager clientMetricsManager = new ClientMetricsManager(this.clientMetricsReceiverPlugin, 100, this.time, metrics);
            try {
                Assertions.assertEquals(Errors.NONE, clientMetricsManager.processPushTelemetryRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setSubscriptionId(processGetTelemetrySubscriptionRequest.data().subscriptionId()).setCompressionType(CompressionType.NONE.id).setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build(), ClientMetricsTestUtils.requestContext()).error());
                Assertions.assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true).build(), ClientMetricsTestUtils.requestContext()).error());
                Assertions.assertEquals(Double.valueOf(1.0d), getMetric(metrics, "throttle-count").metricValue());
                Assertions.assertTrue(((Double) getMetric(metrics, "throttle-rate").metricValue()).doubleValue() > 0.0d);
                clientMetricsManager.close();
                metrics.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                metrics.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testGetTelemetryUpdateSubscription() throws UnknownHostException {
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals(1, this.clientMetricsManager.subscriptions().size());
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        Uuid clientInstanceId = processGetTelemetrySubscriptionRequest.data().clientInstanceId();
        int subscriptionId = processGetTelemetrySubscriptionRequest.data().subscriptionId();
        Assertions.assertNotNull(clientInstanceId);
        Assertions.assertTrue(subscriptionId != 0);
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest.error());
        Properties properties = new Properties();
        properties.put("metrics", "*");
        this.clientMetricsManager.updateSubscription("sub-2", properties);
        Assertions.assertEquals(2, this.clientMetricsManager.subscriptions().size());
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest2 = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true).build(), ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest2.error());
        Assertions.assertTrue(subscriptionId != processGetTelemetrySubscriptionRequest2.data().subscriptionId());
    }

    @Test
    public void testGetTelemetryConcurrentRequestNewClientInstance() throws Exception {
        GetTelemetrySubscriptionsRequest build = new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(Uuid.randomUuid()), true).build();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        List<GetTelemetrySubscriptionsResponse> synchronizedList = Collections.synchronizedList(new ArrayList());
        Thread thread = new Thread(() -> {
            try {
                try {
                    synchronizedList.add(this.clientMetricsManager.processGetTelemetrySubscriptionRequest(build, ClientMetricsTestUtils.requestContext()));
                    countDownLatch.countDown();
                } catch (UnknownHostException e) {
                    LOG.error("Error processing request", e);
                    countDownLatch.countDown();
                }
            } catch (Throwable th) {
                countDownLatch.countDown();
                throw th;
            }
        });
        Thread thread2 = new Thread(() -> {
            try {
                try {
                    synchronizedList.add(this.clientMetricsManager.processGetTelemetrySubscriptionRequest(build, ClientMetricsTestUtils.requestContext()));
                    countDownLatch.countDown();
                } catch (UnknownHostException e) {
                    LOG.error("Error processing request", e);
                    countDownLatch.countDown();
                }
            } catch (Throwable th) {
                countDownLatch.countDown();
                throw th;
            }
        });
        thread.start();
        thread2.start();
        Assertions.assertTrue(countDownLatch.await(2000L, TimeUnit.MILLISECONDS));
        Assertions.assertEquals(2, synchronizedList.size());
        int i = 0;
        for (GetTelemetrySubscriptionsResponse getTelemetrySubscriptionsResponse : synchronizedList) {
            if (getTelemetrySubscriptionsResponse.error() == Errors.THROTTLING_QUOTA_EXCEEDED) {
                i++;
            } else {
                Assertions.assertEquals(Errors.NONE, getTelemetrySubscriptionsResponse.error());
            }
        }
        Assertions.assertEquals(1, i);
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("throttle-count").metricValue());
        Assertions.assertTrue(((Double) getMetric("throttle-rate").metricValue()).doubleValue() > 0.0d);
    }

    @Test
    public void testGetTelemetryConcurrentRequestAfterSubscriptionUpdate() throws Exception {
        GetTelemetrySubscriptionsRequest build = new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(Uuid.randomUuid()), true).build();
        Assertions.assertNotNull(this.clientMetricsManager.clientInstance(this.clientMetricsManager.processGetTelemetrySubscriptionRequest(build, ClientMetricsTestUtils.requestContext()).data().clientInstanceId()));
        CountDownLatch countDownLatch = new CountDownLatch(2);
        List<GetTelemetrySubscriptionsResponse> synchronizedList = Collections.synchronizedList(new ArrayList());
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals(1, this.clientMetricsManager.subscriptions().size());
        Thread thread = new Thread(() -> {
            try {
                try {
                    synchronizedList.add(this.clientMetricsManager.processGetTelemetrySubscriptionRequest(build, ClientMetricsTestUtils.requestContext()));
                    countDownLatch.countDown();
                } catch (UnknownHostException e) {
                    LOG.error("Error processing request", e);
                    countDownLatch.countDown();
                }
            } catch (Throwable th) {
                countDownLatch.countDown();
                throw th;
            }
        });
        Thread thread2 = new Thread(() -> {
            try {
                try {
                    synchronizedList.add(this.clientMetricsManager.processGetTelemetrySubscriptionRequest(build, ClientMetricsTestUtils.requestContext()));
                    countDownLatch.countDown();
                } catch (UnknownHostException e) {
                    LOG.error("Error processing request", e);
                    countDownLatch.countDown();
                }
            } catch (Throwable th) {
                countDownLatch.countDown();
                throw th;
            }
        });
        thread.start();
        thread2.start();
        Assertions.assertTrue(countDownLatch.await(2000L, TimeUnit.MILLISECONDS));
        Assertions.assertEquals(2, synchronizedList.size());
        int i = 0;
        for (GetTelemetrySubscriptionsResponse getTelemetrySubscriptionsResponse : synchronizedList) {
            if (getTelemetrySubscriptionsResponse.error() == Errors.THROTTLING_QUOTA_EXCEEDED) {
                i++;
            } else {
                Assertions.assertEquals(Errors.NONE, getTelemetrySubscriptionsResponse.error());
            }
        }
        Assertions.assertEquals(1, i);
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("throttle-count").metricValue());
        Assertions.assertTrue(((Double) getMetric("throttle-rate").metricValue()).doubleValue() > 0.0d);
    }

    @Test
    public void testPushTelemetry() throws Exception {
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals(1, this.clientMetricsManager.subscriptions().size());
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        ClientMetricsInstance clientInstance = this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
        Assertions.assertNotNull(clientInstance);
        Assertions.assertEquals(Errors.NONE, this.clientMetricsManager.processPushTelemetryRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setSubscriptionId(processGetTelemetrySubscriptionRequest.data().subscriptionId()).setCompressionType(CompressionType.NONE.id).setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build(), ClientMetricsTestUtils.requestContext()).error());
        Assertions.assertFalse(clientInstance.terminating());
        Assertions.assertEquals(Errors.NONE, clientInstance.lastKnownError());
        Assertions.assertEquals(12, this.kafkaMetrics.metrics().size());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("instance-count").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("unknown-subscription-request-count").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("throttle-count").metricValue());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("plugin-export-count").metricValue());
        Assertions.assertTrue(((Double) getMetric("plugin-export-rate").metricValue()).doubleValue() > 0.0d);
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("plugin-error-count").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("plugin-error-rate").metricValue());
        Assertions.assertNotEquals(Double.valueOf(Double.NaN), getMetric("plugin-export-time-avg").metricValue());
        Assertions.assertNotEquals(Double.valueOf(Double.NaN), getMetric("plugin-export-time-max").metricValue());
    }

    @Test
    public void testPushTelemetryOnNewServer() throws Exception {
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        Metrics metrics = new Metrics();
        try {
            ClientMetricsManager clientMetricsManager = new ClientMetricsManager(this.clientMetricsReceiverPlugin, 100, this.time, metrics);
            try {
                Assertions.assertEquals(Errors.NONE, clientMetricsManager.processPushTelemetryRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setSubscriptionId(processGetTelemetrySubscriptionRequest.data().subscriptionId()).setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build(), ClientMetricsTestUtils.requestContext()).error());
                Assertions.assertEquals(12, metrics.metrics().size());
                Assertions.assertEquals(Double.valueOf(1.0d), getMetric(metrics, "instance-count").metricValue());
                Assertions.assertEquals(Double.valueOf(0.0d), getMetric(metrics, "unknown-subscription-request-count").metricValue());
                Assertions.assertEquals(Double.valueOf(0.0d), getMetric(metrics, "throttle-count").metricValue());
                Assertions.assertEquals(Double.valueOf(1.0d), getMetric(metrics, "plugin-export-count").metricValue());
                Assertions.assertEquals(Double.valueOf(0.0d), getMetric(metrics, "plugin-error-count").metricValue());
                Assertions.assertNotEquals(Double.valueOf(Double.NaN), getMetric(metrics, "plugin-export-time-avg").metricValue());
                Assertions.assertNotEquals(Double.valueOf(Double.NaN), getMetric(metrics, "plugin-export-time-max").metricValue());
                clientMetricsManager.close();
                metrics.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                metrics.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testPushTelemetryAfterPushIntervalTime() throws UnknownHostException {
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals(1, this.clientMetricsManager.subscriptions().size());
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        PushTelemetryRequest build = new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setSubscriptionId(processGetTelemetrySubscriptionRequest.data().subscriptionId()).setCompressionType(CompressionType.NONE.id).setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build();
        Assertions.assertEquals(Errors.NONE, this.clientMetricsManager.processPushTelemetryRequest(build, ClientMetricsTestUtils.requestContext()).error());
        this.time.sleep(30000L);
        Assertions.assertEquals(Errors.NONE, this.clientMetricsManager.processPushTelemetryRequest(build, ClientMetricsTestUtils.requestContext()).error());
    }

    @Test
    public void testPushTelemetryClientInstanceIdInvalid() throws UnknownHostException {
        Assertions.assertEquals(Errors.INVALID_REQUEST, this.clientMetricsManager.processPushTelemetryRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId((Uuid) null), true).build(), ClientMetricsTestUtils.requestContext()).error());
        Assertions.assertEquals(Errors.INVALID_REQUEST, this.clientMetricsManager.processPushTelemetryRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(Uuid.ZERO_UUID), true).build(), ClientMetricsTestUtils.requestContext()).error());
    }

    @Test
    public void testPushTelemetryThrottleError() throws Exception {
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        PushTelemetryRequest build = new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setSubscriptionId(processGetTelemetrySubscriptionRequest.data().subscriptionId()).setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build();
        Assertions.assertEquals(Errors.NONE, this.clientMetricsManager.processPushTelemetryRequest(build, ClientMetricsTestUtils.requestContext()).error());
        Assertions.assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, this.clientMetricsManager.processPushTelemetryRequest(build, ClientMetricsTestUtils.requestContext()).error());
        ClientMetricsInstance clientInstance = this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
        Assertions.assertNotNull(clientInstance);
        Assertions.assertFalse(clientInstance.terminating());
        Assertions.assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, clientInstance.lastKnownError());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("throttle-count").metricValue());
        Assertions.assertTrue(((Double) getMetric("throttle-rate").metricValue()).doubleValue() > 0.0d);
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("plugin-export-count").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("plugin-error-count").metricValue());
        Assertions.assertNotEquals(Double.valueOf(Double.NaN), getMetric("plugin-export-time-avg").metricValue());
        Assertions.assertNotEquals(Double.valueOf(Double.NaN), getMetric("plugin-export-time-max").metricValue());
    }

    @Test
    public void testPushTelemetryTerminatingFlag() throws Exception {
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals(Errors.NONE, this.clientMetricsManager.processPushTelemetryRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setSubscriptionId(processGetTelemetrySubscriptionRequest.data().subscriptionId()).setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build(), ClientMetricsTestUtils.requestContext()).error());
        Assertions.assertEquals(Errors.NONE, this.clientMetricsManager.processPushTelemetryRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setSubscriptionId(processGetTelemetrySubscriptionRequest.data().subscriptionId()).setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))).setTerminating(true), true).build(), ClientMetricsTestUtils.requestContext()).error());
        ClientMetricsInstance clientInstance = this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
        Assertions.assertNotNull(clientInstance);
        Assertions.assertTrue(clientInstance.terminating());
        Assertions.assertEquals(Errors.NONE, clientInstance.lastKnownError());
        Assertions.assertEquals(Double.valueOf(2.0d), getMetric("plugin-export-count").metricValue());
        Assertions.assertTrue(((Double) getMetric("plugin-export-rate").metricValue()).doubleValue() > 0.0d);
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("plugin-error-count").metricValue());
        Assertions.assertNotEquals(Double.valueOf(Double.NaN), getMetric("plugin-export-time-avg").metricValue());
        Assertions.assertNotEquals(Double.valueOf(Double.NaN), getMetric("plugin-export-time-max").metricValue());
    }

    @Test
    public void testPushTelemetryNextRequestPostTerminatingFlag() throws UnknownHostException {
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        ClientMetricsInstance clientInstance = this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
        Assertions.assertNotNull(clientInstance);
        Assertions.assertEquals(Errors.NONE, this.clientMetricsManager.processPushTelemetryRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setSubscriptionId(processGetTelemetrySubscriptionRequest.data().subscriptionId()).setTerminating(true), true).build(), ClientMetricsTestUtils.requestContext()).error());
        Assertions.assertTrue(clientInstance.terminating());
        Assertions.assertEquals(Errors.NONE, clientInstance.lastKnownError());
        Assertions.assertEquals(Errors.INVALID_REQUEST, this.clientMetricsManager.processPushTelemetryRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setSubscriptionId(processGetTelemetrySubscriptionRequest.data().subscriptionId()).setTerminating(true), true).build(), ClientMetricsTestUtils.requestContext()).error());
        Assertions.assertTrue(clientInstance.terminating());
        Assertions.assertEquals(Errors.INVALID_REQUEST, clientInstance.lastKnownError());
    }

    @Test
    public void testPushTelemetrySubscriptionIdInvalid() throws Exception {
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        ClientMetricsInstance clientInstance = this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
        Assertions.assertNotNull(clientInstance);
        Assertions.assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, this.clientMetricsManager.processPushTelemetryRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))).setSubscriptionId(1234), true).build(), ClientMetricsTestUtils.requestContext()).error());
        Assertions.assertFalse(clientInstance.terminating());
        Assertions.assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, clientInstance.lastKnownError());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("unknown-subscription-request-count").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("plugin-export-count").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("plugin-error-count").metricValue());
        Assertions.assertEquals(Double.valueOf(Double.NaN), getMetric("plugin-export-time-avg").metricValue());
        Assertions.assertEquals(Double.valueOf(Double.NaN), getMetric("plugin-export-time-max").metricValue());
    }

    @Test
    public void testPushTelemetryCompressionTypeInvalid() throws UnknownHostException {
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        ClientMetricsInstance clientInstance = this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
        Assertions.assertNotNull(clientInstance);
        Assertions.assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, this.clientMetricsManager.processPushTelemetryRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setSubscriptionId(processGetTelemetrySubscriptionRequest.data().subscriptionId()).setCompressionType((byte) 100), true).build(), ClientMetricsTestUtils.requestContext()).error());
        Assertions.assertFalse(clientInstance.terminating());
        Assertions.assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, clientInstance.lastKnownError());
    }

    @Test
    public void testPushTelemetryNullMetricsData() throws Exception {
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        ClientMetricsInstance clientInstance = this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
        Assertions.assertNotNull(clientInstance);
        Assertions.assertEquals(Errors.NONE, this.clientMetricsManager.processPushTelemetryRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setSubscriptionId(processGetTelemetrySubscriptionRequest.data().subscriptionId()).setMetrics((ByteBuffer) null), true).build(), ClientMetricsTestUtils.requestContext()).error());
        Assertions.assertFalse(clientInstance.terminating());
        Assertions.assertEquals(Errors.NONE, clientInstance.lastKnownError());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("instance-count").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("plugin-export-count").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("plugin-error-count").metricValue());
        Assertions.assertEquals(Double.valueOf(Double.NaN), getMetric("plugin-export-time-avg").metricValue());
        Assertions.assertEquals(Double.valueOf(Double.NaN), getMetric("plugin-export-time-max").metricValue());
    }

    @Test
    public void testPushTelemetryMetricsTooLarge() throws Exception {
        Metrics metrics = new Metrics();
        try {
            ClientMetricsManager clientMetricsManager = new ClientMetricsManager(this.clientMetricsReceiverPlugin, 1, this.time, metrics);
            try {
                GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
                ClientMetricsInstance clientInstance = clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
                Assertions.assertNotNull(clientInstance);
                byte[] bytes = "ab".getBytes(StandardCharsets.UTF_8);
                Assertions.assertEquals(2, bytes.length);
                Assertions.assertEquals(Errors.TELEMETRY_TOO_LARGE, clientMetricsManager.processPushTelemetryRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setSubscriptionId(processGetTelemetrySubscriptionRequest.data().subscriptionId()).setMetrics(ByteBuffer.wrap(bytes)), true).build(), ClientMetricsTestUtils.requestContext()).error());
                Assertions.assertFalse(clientInstance.terminating());
                Assertions.assertEquals(Errors.TELEMETRY_TOO_LARGE, clientInstance.lastKnownError());
                clientMetricsManager.close();
                metrics.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                metrics.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testPushTelemetryConcurrentRequestNewClientInstance() throws Exception {
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        Assertions.assertNotNull(this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId()));
        PushTelemetryRequest build = new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setSubscriptionId(processGetTelemetrySubscriptionRequest.data().subscriptionId()).setCompressionType(CompressionType.NONE.id).setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        List<PushTelemetryResponse> synchronizedList = Collections.synchronizedList(new ArrayList());
        Metrics metrics = new Metrics();
        try {
            ClientMetricsManager clientMetricsManager = new ClientMetricsManager(this.clientMetricsReceiverPlugin, 100, this.time, metrics);
            try {
                Thread thread = new Thread(() -> {
                    try {
                        try {
                            synchronizedList.add(clientMetricsManager.processPushTelemetryRequest(build, ClientMetricsTestUtils.requestContext()));
                            countDownLatch.countDown();
                        } catch (UnknownHostException e) {
                            LOG.error("Error processing request", e);
                            countDownLatch.countDown();
                        }
                    } catch (Throwable th) {
                        countDownLatch.countDown();
                        throw th;
                    }
                });
                Thread thread2 = new Thread(() -> {
                    try {
                        try {
                            synchronizedList.add(clientMetricsManager.processPushTelemetryRequest(build, ClientMetricsTestUtils.requestContext()));
                            countDownLatch.countDown();
                        } catch (UnknownHostException e) {
                            LOG.error("Error processing request", e);
                            countDownLatch.countDown();
                        }
                    } catch (Throwable th) {
                        countDownLatch.countDown();
                        throw th;
                    }
                });
                thread.start();
                thread2.start();
                Assertions.assertTrue(countDownLatch.await(2000L, TimeUnit.MILLISECONDS));
                Assertions.assertEquals(2, synchronizedList.size());
                int i = 0;
                for (PushTelemetryResponse pushTelemetryResponse : synchronizedList) {
                    if (pushTelemetryResponse.error() == Errors.THROTTLING_QUOTA_EXCEEDED) {
                        i++;
                    } else {
                        Assertions.assertEquals(Errors.NONE, pushTelemetryResponse.error());
                    }
                }
                Assertions.assertEquals(1, i);
                Assertions.assertEquals(Double.valueOf(1.0d), getMetric(metrics, "throttle-count").metricValue());
                Assertions.assertTrue(((Double) getMetric(metrics, "throttle-rate").metricValue()).doubleValue() > 0.0d);
                Assertions.assertEquals(Double.valueOf(1.0d), getMetric(metrics, "plugin-export-count").metricValue());
                Assertions.assertEquals(Double.valueOf(0.0d), getMetric(metrics, "plugin-error-count").metricValue());
                Assertions.assertNotEquals(Double.valueOf(Double.NaN), getMetric(metrics, "plugin-export-time-avg").metricValue());
                Assertions.assertNotEquals(Double.valueOf(Double.NaN), getMetric(metrics, "plugin-export-time-max").metricValue());
                clientMetricsManager.close();
                metrics.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                metrics.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testPushTelemetryConcurrentRequestAfterSubscriptionUpdate() throws Exception {
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        Assertions.assertNotNull(this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId()));
        PushTelemetryRequest build = new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setSubscriptionId(processGetTelemetrySubscriptionRequest.data().subscriptionId()).setCompressionType(CompressionType.NONE.id).setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build();
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals(1, this.clientMetricsManager.subscriptions().size());
        CountDownLatch countDownLatch = new CountDownLatch(2);
        List<PushTelemetryResponse> synchronizedList = Collections.synchronizedList(new ArrayList());
        Thread thread = new Thread(() -> {
            try {
                try {
                    synchronizedList.add(this.clientMetricsManager.processPushTelemetryRequest(build, ClientMetricsTestUtils.requestContext()));
                    countDownLatch.countDown();
                } catch (UnknownHostException e) {
                    LOG.error("Error processing request", e);
                    countDownLatch.countDown();
                }
            } catch (Throwable th) {
                countDownLatch.countDown();
                throw th;
            }
        });
        Thread thread2 = new Thread(() -> {
            try {
                try {
                    synchronizedList.add(this.clientMetricsManager.processPushTelemetryRequest(build, ClientMetricsTestUtils.requestContext()));
                    countDownLatch.countDown();
                } catch (UnknownHostException e) {
                    LOG.error("Error processing request", e);
                    countDownLatch.countDown();
                }
            } catch (Throwable th) {
                countDownLatch.countDown();
                throw th;
            }
        });
        thread.start();
        thread2.start();
        Assertions.assertTrue(countDownLatch.await(2000L, TimeUnit.MILLISECONDS));
        Assertions.assertEquals(2, synchronizedList.size());
        int i = 0;
        for (PushTelemetryResponse pushTelemetryResponse : synchronizedList) {
            if (pushTelemetryResponse.error() == Errors.THROTTLING_QUOTA_EXCEEDED) {
                i++;
            } else {
                Assertions.assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, pushTelemetryResponse.error());
            }
        }
        Assertions.assertEquals(1, i);
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("unknown-subscription-request-count").metricValue());
        Assertions.assertTrue(((Double) getMetric("unknown-subscription-request-rate").metricValue()).doubleValue() > 0.0d);
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("throttle-count").metricValue());
        Assertions.assertTrue(((Double) getMetric("throttle-rate").metricValue()).doubleValue() > 0.0d);
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("plugin-export-count").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("plugin-error-count").metricValue());
        Assertions.assertEquals(Double.valueOf(Double.NaN), getMetric("plugin-export-time-avg").metricValue());
        Assertions.assertEquals(Double.valueOf(Double.NaN), getMetric("plugin-export-time-max").metricValue());
        Assertions.assertEquals(2, this.clientMetricsManager.clientConnectionIdMap().size());
    }

    @Test
    public void testPushTelemetryPluginException() throws Exception {
        ClientMetricsReceiverPlugin clientMetricsReceiverPlugin = (ClientMetricsReceiverPlugin) Mockito.mock(ClientMetricsReceiverPlugin.class);
        ((ClientMetricsReceiverPlugin) Mockito.doThrow(new Throwable[]{new RuntimeException("test exception")}).when(clientMetricsReceiverPlugin)).exportMetrics((RequestContext) Mockito.any(), (PushTelemetryRequest) Mockito.any());
        Metrics metrics = new Metrics();
        try {
            ClientMetricsManager clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 100, this.time, 100, metrics);
            try {
                clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
                Assertions.assertEquals(1, clientMetricsManager.subscriptions().size());
                GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
                ClientMetricsInstance clientInstance = clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
                Assertions.assertNotNull(clientInstance);
                Assertions.assertEquals(Errors.INVALID_RECORD, clientMetricsManager.processPushTelemetryRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(processGetTelemetrySubscriptionRequest.data().clientInstanceId()).setSubscriptionId(processGetTelemetrySubscriptionRequest.data().subscriptionId()).setCompressionType(CompressionType.NONE.id).setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build(), ClientMetricsTestUtils.requestContext()).error());
                Assertions.assertFalse(clientInstance.terminating());
                Assertions.assertEquals(Errors.INVALID_RECORD, clientInstance.lastKnownError());
                Assertions.assertEquals(Double.valueOf(0.0d), getMetric(metrics, "unknown-subscription-request-count").metricValue());
                Assertions.assertEquals(Double.valueOf(0.0d), getMetric(metrics, "throttle-count").metricValue());
                Assertions.assertEquals(Double.valueOf(0.0d), getMetric(metrics, "plugin-export-count").metricValue());
                Assertions.assertEquals(Double.valueOf(1.0d), getMetric(metrics, "plugin-error-count").metricValue());
                Assertions.assertTrue(((Double) getMetric(metrics, "plugin-error-rate").metricValue()).doubleValue() > 0.0d);
                Assertions.assertEquals(Double.valueOf(Double.NaN), getMetric(metrics, "plugin-export-time-avg").metricValue());
                Assertions.assertEquals(Double.valueOf(Double.NaN), getMetric(metrics, "plugin-export-time-max").metricValue());
                clientMetricsManager.close();
                metrics.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                metrics.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCacheEviction() throws Exception {
        Properties properties = new Properties();
        properties.put("metrics", "*");
        properties.put("interval.ms", "100");
        this.clientMetricsManager.updateSubscription("sub-1", properties);
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest.error());
        Assertions.assertEquals(12, this.kafkaMetrics.metrics().size());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("instance-count").metricValue());
        Assertions.assertNotNull(this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId()));
        Assertions.assertEquals(1, this.clientMetricsManager.clientConnectionIdMap().size());
        Assertions.assertEquals(1, this.clientMetricsManager.expirationTimer().size());
        this.clientMetricsManager.expirationTimer().advanceClock(300L);
        Assertions.assertTimeoutPreemptively(Duration.ofMillis(300L), () -> {
            while (true) {
                if (this.clientMetricsManager.expirationTimer().size() == 0 && this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId()) == null) {
                    return;
                } else {
                    Thread.sleep(50L);
                }
            }
        });
        Assertions.assertEquals(4, this.kafkaMetrics.metrics().size());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("instance-count").metricValue());
        Assertions.assertTrue(this.clientMetricsManager.clientConnectionIdMap().isEmpty());
    }

    @Test
    public void testCacheEvictionWithMultipleClients() throws Exception {
        Properties properties = new Properties();
        properties.put("metrics", "*");
        properties.put("interval.ms", "100");
        this.clientMetricsManager.updateSubscription("sub-1", properties);
        GetTelemetrySubscriptionsRequest build = new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(build, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest.error());
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest2 = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(build, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest2.error());
        Assertions.assertEquals(20, this.kafkaMetrics.metrics().size());
        Assertions.assertEquals(Double.valueOf(2.0d), getMetric("instance-count").metricValue());
        Assertions.assertNotNull(this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId()));
        Assertions.assertNotNull(this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest2.data().clientInstanceId()));
        Assertions.assertEquals(2, this.clientMetricsManager.clientConnectionIdMap().size());
        Assertions.assertEquals(2, this.clientMetricsManager.expirationTimer().size());
        this.clientMetricsManager.expirationTimer().advanceClock(300L);
        Assertions.assertTimeoutPreemptively(Duration.ofMillis(300L), () -> {
            while (true) {
                if (this.clientMetricsManager.expirationTimer().size() == 0 && this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId()) == null && this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest2.data().clientInstanceId()) == null) {
                    return;
                } else {
                    Thread.sleep(50L);
                }
            }
        });
        Assertions.assertEquals(4, this.kafkaMetrics.metrics().size());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("instance-count").metricValue());
        Assertions.assertTrue(this.clientMetricsManager.clientConnectionIdMap().isEmpty());
    }

    @Test
    public void testCacheExpirationTaskCancelledOnInstanceUpdate() throws Exception {
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContextWithConnectionId("conn-1"));
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest.error());
        Uuid clientInstanceId = processGetTelemetrySubscriptionRequest.data().clientInstanceId();
        int subscriptionId = processGetTelemetrySubscriptionRequest.data().subscriptionId();
        ClientMetricsInstance clientInstance = this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest.data().clientInstanceId());
        Assertions.assertNotNull(clientInstance);
        Assertions.assertNotNull(clientInstance.expirationTimerTask());
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals(1, this.clientMetricsManager.subscriptions().size());
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest2 = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true).build(), ClientMetricsTestUtils.requestContextWithConnectionId("conn-1"));
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest2.error());
        Assertions.assertTrue(subscriptionId != processGetTelemetrySubscriptionRequest2.data().subscriptionId());
        Assertions.assertNull(clientInstance.expirationTimerTask());
        ClientMetricsInstance clientInstance2 = this.clientMetricsManager.clientInstance(processGetTelemetrySubscriptionRequest2.data().clientInstanceId());
        Assertions.assertNotNull(clientInstance2);
        Assertions.assertNotNull(clientInstance2.expirationTimerTask());
        Assertions.assertEquals(1, this.clientMetricsManager.expirationTimer().size());
        Assertions.assertEquals(12, this.kafkaMetrics.metrics().size());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("instance-count").metricValue());
        Assertions.assertEquals(1, this.clientMetricsManager.clientConnectionIdMap().size());
    }

    @Test
    public void testRemoveConnection() throws Exception {
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContextWithConnectionId("conn-1"));
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest.error());
        Uuid clientInstanceId = processGetTelemetrySubscriptionRequest.data().clientInstanceId();
        Assertions.assertNotNull(this.clientMetricsManager.clientInstance(clientInstanceId));
        Assertions.assertEquals(1, this.clientMetricsManager.clientConnectionIdMap().size());
        Assertions.assertEquals(clientInstanceId, this.clientMetricsManager.clientConnectionIdMap().get("conn-1"));
        Assertions.assertEquals(12, this.kafkaMetrics.metrics().size());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("instance-count").metricValue());
        this.clientMetricsManager.connectionDisconnectListener().onDisconnect("conn-1");
        Assertions.assertNull(this.clientMetricsManager.clientInstance(clientInstanceId));
        Assertions.assertTrue(this.clientMetricsManager.clientConnectionIdMap().isEmpty());
        Assertions.assertEquals(4, this.kafkaMetrics.metrics().size());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("instance-count").metricValue());
    }

    @Test
    public void testRemoveConnectionUnknownConnectionId() throws Exception {
        GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build(), ClientMetricsTestUtils.requestContextWithConnectionId("conn-1"));
        Assertions.assertEquals(Errors.NONE, processGetTelemetrySubscriptionRequest.error());
        Uuid clientInstanceId = processGetTelemetrySubscriptionRequest.data().clientInstanceId();
        Assertions.assertNotNull(this.clientMetricsManager.clientInstance(clientInstanceId));
        Assertions.assertEquals(1, this.clientMetricsManager.clientConnectionIdMap().size());
        Assertions.assertEquals(clientInstanceId, this.clientMetricsManager.clientConnectionIdMap().get("conn-1"));
        Assertions.assertEquals(12, this.kafkaMetrics.metrics().size());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("instance-count").metricValue());
        this.clientMetricsManager.connectionDisconnectListener().onDisconnect("conn-2");
        Assertions.assertNotNull(this.clientMetricsManager.clientInstance(clientInstanceId));
        Assertions.assertEquals(1, this.clientMetricsManager.clientConnectionIdMap().size());
        Assertions.assertEquals(12, this.kafkaMetrics.metrics().size());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("instance-count").metricValue());
    }

    private KafkaMetric getMetric(String str) throws Exception {
        return getMetric(this.kafkaMetrics, str);
    }

    private KafkaMetric getMetric(Metrics metrics, String str) throws Exception {
        Optional findFirst = metrics.metrics().entrySet().stream().filter(entry -> {
            return ((MetricName) entry.getKey()).name().equals(str);
        }).findFirst();
        if (findFirst.isPresent()) {
            return (KafkaMetric) ((Map.Entry) findFirst.get()).getValue();
        }
        throw new Exception(String.format("Could not find metric called %s", str));
    }
}
