package org.apache.kafka.common.test.junit;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeAclsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.AutoStart;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTemplate;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Assertions;

@ClusterTestDefaults(types = {Type.KRAFT}, serverProperties = {@ClusterConfigProperty(key = "default.key", value = "default.value"), @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "100")})
/* loaded from: input_file:org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.class */
public class ClusterTestExtensionsTest {
    private final ClusterInstance clusterInstance;

    ClusterTestExtensionsTest(ClusterInstance clusterInstance) {
        this.clusterInstance = clusterInstance;
    }

    static List<ClusterConfig> generate1() {
        HashMap hashMap = new HashMap();
        hashMap.put("foo", "bar");
        return Collections.singletonList(ClusterConfig.defaultBuilder().setTypes(Collections.singleton(Type.KRAFT)).setServerProperties(hashMap).setTags(Collections.singletonList("Generated Test")).build());
    }

    @ClusterTest
    public void testClusterTest(ClusterInstance clusterInstance) {
        Assertions.assertSame(this.clusterInstance, clusterInstance, "Injected objects should be the same");
        Assertions.assertEquals(Type.KRAFT, clusterInstance.type());
        Assertions.assertEquals("default.value", clusterInstance.config().serverProperties().get("default.key"));
    }

    @ClusterTemplate("generate1")
    public void testClusterTemplate() {
        Assertions.assertEquals(Type.KRAFT, this.clusterInstance.type(), "generate1 provided a KRAFT cluster, so we should see that here");
        Assertions.assertEquals("bar", this.clusterInstance.config().serverProperties().get("foo"));
        Assertions.assertEquals(Collections.singletonList("Generated Test"), this.clusterInstance.config().tags());
    }

    @ClusterTests({@ClusterTest(types = {Type.KRAFT}, serverProperties = {@ClusterConfigProperty(key = "foo", value = "baz"), @ClusterConfigProperty(key = "spam", value = "eggz"), @ClusterConfigProperty(key = "default.key", value = "overwrite.value"), @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200"), @ClusterConfigProperty(id = 3000, key = "queued.max.requests", value = "300"), @ClusterConfigProperty(key = "spam", value = "eggs"), @ClusterConfigProperty(key = "default.key", value = "overwrite.value")}, tags = {"default.display.key1", "default.display.key2"}), @ClusterTest(types = {Type.CO_KRAFT}, serverProperties = {@ClusterConfigProperty(key = "foo", value = "baz"), @ClusterConfigProperty(key = "spam", value = "eggz"), @ClusterConfigProperty(key = "default.key", value = "overwrite.value"), @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200"), @ClusterConfigProperty(key = "spam", value = "eggs"), @ClusterConfigProperty(key = "default.key", value = "overwrite.value")}, tags = {"default.display.key1", "default.display.key2"})})
    public void testClusterTests() throws ExecutionException, InterruptedException {
        Assertions.assertEquals("baz", this.clusterInstance.config().serverProperties().get("foo"));
        Assertions.assertEquals("eggs", this.clusterInstance.config().serverProperties().get("spam"));
        Assertions.assertEquals("overwrite.value", this.clusterInstance.config().serverProperties().get("default.key"));
        Assertions.assertEquals(Arrays.asList("default.display.key1", "default.display.key2"), this.clusterInstance.config().tags());
        Admin admin = this.clusterInstance.admin();
        try {
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
            Map map = (Map) admin.describeConfigs(Collections.singletonList(configResource)).all().get();
            Assertions.assertEquals(1, map.size());
            Assertions.assertEquals("200", ((Config) map.get(configResource)).get("queued.max.requests").value());
            if (admin != null) {
                admin.close();
            }
            if (this.clusterInstance.type() == Type.KRAFT) {
                Admin create = Admin.create(Collections.singletonMap("bootstrap.controllers", this.clusterInstance.bootstrapControllers()));
                try {
                    ConfigResource configResource2 = new ConfigResource(ConfigResource.Type.BROKER, "3000");
                    Map map2 = (Map) create.describeConfigs(Collections.singletonList(configResource2)).all().get();
                    Assertions.assertEquals(1, map2.size());
                    Assertions.assertEquals("300", ((Config) map2.get(configResource2)).get("queued.max.requests").value());
                    if (create != null) {
                        create.close();
                    }
                } catch (Throwable th) {
                    if (create != null) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
        } catch (Throwable th3) {
            if (admin != null) {
                try {
                    admin.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @ClusterTests({@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}), @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, disksPerBroker = 2)})
    public void testClusterTestWithDisksPerBroker() throws ExecutionException, InterruptedException {
        Admin admin = this.clusterInstance.admin();
        try {
            ((Map) admin.describeLogDirs(this.clusterInstance.brokerIds()).allDescriptions().get()).forEach((num, map) -> {
                Assertions.assertEquals(this.clusterInstance.config().numDisksPerBroker(), map.size());
            });
            if (admin != null) {
                admin.close();
            }
        } catch (Throwable th) {
            if (admin != null) {
                try {
                    admin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest(autoStart = AutoStart.NO)
    public void testNoAutoStart() {
        ClusterInstance clusterInstance = this.clusterInstance;
        Objects.requireNonNull(clusterInstance);
        Assertions.assertThrows(RuntimeException.class, clusterInstance::anyBrokerSocketServer);
        this.clusterInstance.start();
        Assertions.assertNotNull(this.clusterInstance.anyBrokerSocketServer());
    }

    @ClusterTest
    public void testDefaults(ClusterInstance clusterInstance) {
        Assertions.assertEquals(MetadataVersion.latestTesting(), clusterInstance.config().metadataVersion());
    }

    @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
    public void testSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
        HashSet hashSet = new HashSet();
        hashSet.add(GroupProtocol.CLASSIC);
        hashSet.add(GroupProtocol.CONSUMER);
        Assertions.assertEquals(hashSet, clusterInstance.supportedGroupProtocols());
    }

    @ClusterTests({@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic")}), @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false")})})
    public void testNotSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
        Assertions.assertEquals(Collections.singleton(GroupProtocol.CLASSIC), clusterInstance.supportedGroupProtocols());
    }

    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 3)
    public void testCreateTopic(ClusterInstance clusterInstance) throws Exception {
        String str = "test";
        short s = 3;
        clusterInstance.createTopic("test", 3, (short) 3);
        Admin admin = clusterInstance.admin();
        try {
            Assertions.assertTrue(((Collection) admin.listTopics().listings().get()).stream().anyMatch(topicListing -> {
                return topicListing.name().equals(str);
            }));
            List partitions = ((TopicDescription) ((Map) admin.describeTopics(Collections.singleton("test")).allTopicNames().get()).get("test")).partitions();
            Assertions.assertEquals(3, partitions.size());
            Assertions.assertTrue(partitions.stream().allMatch(topicPartitionInfo -> {
                return topicPartitionInfo.replicas().size() == s;
            }));
            if (admin != null) {
                admin.close();
            }
        } catch (Throwable th) {
            if (admin != null) {
                try {
                    admin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
    public void testShutdownAndSyncMetadata(ClusterInstance clusterInstance) throws Exception {
        clusterInstance.createTopic("test", 3, (short) 3);
        clusterInstance.shutdownBroker(0);
        clusterInstance.waitForTopic("test", 3);
    }

    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
    public void testClusterAliveBrokers(ClusterInstance clusterInstance) throws Exception {
        clusterInstance.waitForReadyBrokers();
        clusterInstance.shutdownBroker(0);
        Assertions.assertFalse(clusterInstance.aliveBrokers().containsKey(0));
        Assertions.assertTrue(clusterInstance.brokers().containsKey(0));
        clusterInstance.startBroker(0);
        Assertions.assertTrue(clusterInstance.aliveBrokers().containsKey(0));
        Assertions.assertTrue(clusterInstance.brokers().containsKey(0));
    }

    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 4, serverProperties = {@ClusterConfigProperty(key = "log.initial.task.delay.ms", value = "100"), @ClusterConfigProperty(key = "log.segment.delete.delay.ms", value = "1000")})
    public void testVerifyTopicDeletion(ClusterInstance clusterInstance) throws Exception {
        Admin admin = clusterInstance.admin();
        try {
            String str = "testTopic";
            admin.createTopics(Collections.singletonList(new NewTopic("testTopic", 1, (short) 1)));
            clusterInstance.waitForTopic("testTopic", 1);
            admin.deleteTopics(Collections.singletonList("testTopic"));
            clusterInstance.waitTopicDeletion("testTopic");
            Assertions.assertTrue(((Collection) admin.listTopics().listings().get()).stream().noneMatch(topicListing -> {
                return topicListing.name().equals(str);
            }));
            if (admin != null) {
                admin.close();
            }
        } catch (Throwable th) {
            if (admin != null) {
                try {
                    admin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 3)
    public void testCreateProducerAndConsumer(ClusterInstance clusterInstance) throws InterruptedException {
        Admin admin = clusterInstance.admin();
        try {
            Producer producer = clusterInstance.producer(Map.of("acks", "all", "key.serializer", StringSerializer.class.getName(), "value.serializer", StringSerializer.class.getName()));
            try {
                Consumer consumer = clusterInstance.consumer(Map.of("key.deserializer", StringDeserializer.class.getName(), "value.deserializer", StringDeserializer.class.getName()));
                try {
                    admin.createTopics(Collections.singleton(new NewTopic("topic", 1, (short) 1)));
                    Assertions.assertNotNull(producer);
                    Assertions.assertNotNull(consumer);
                    producer.send(new ProducerRecord("topic", "key", "value"));
                    producer.flush();
                    consumer.subscribe(Collections.singletonList("topic"));
                    ArrayList arrayList = new ArrayList();
                    TestUtils.waitForCondition(() -> {
                        ConsumerRecords poll = consumer.poll(Duration.ofMillis(100L));
                        Objects.requireNonNull(arrayList);
                        poll.forEach((v1) -> {
                            r1.add(v1);
                        });
                        return Boolean.valueOf(arrayList.size() == 1);
                    }, "Failed to receive message");
                    Assertions.assertEquals("key", ((ConsumerRecord) arrayList.get(0)).key());
                    Assertions.assertEquals("value", ((ConsumerRecord) arrayList.get(0)).value());
                    if (consumer != null) {
                        consumer.close();
                    }
                    if (producer != null) {
                        producer.close();
                    }
                    if (admin != null) {
                        admin.close();
                    }
                } catch (Throwable th) {
                    if (consumer != null) {
                        try {
                            consumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (admin != null) {
                try {
                    admin.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, serverProperties = {@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")})
    public void testCreateDefaultProducerAndConsumer(ClusterInstance clusterInstance) throws InterruptedException {
        byte[] bytes = "key".getBytes(StandardCharsets.UTF_8);
        byte[] bytes2 = "value".getBytes(StandardCharsets.UTF_8);
        Admin admin = clusterInstance.admin();
        try {
            Producer producer = clusterInstance.producer();
            try {
                Consumer consumer = clusterInstance.consumer();
                try {
                    admin.createTopics(Collections.singleton(new NewTopic("topic", 1, (short) 1)));
                    Assertions.assertNotNull(producer);
                    Assertions.assertNotNull(consumer);
                    producer.send(new ProducerRecord("topic", bytes, bytes2));
                    producer.flush();
                    consumer.subscribe(Collections.singletonList("topic"));
                    ArrayList arrayList = new ArrayList();
                    TestUtils.waitForCondition(() -> {
                        ConsumerRecords poll = consumer.poll(Duration.ofMillis(100L));
                        Objects.requireNonNull(arrayList);
                        poll.forEach((v1) -> {
                            r1.add(v1);
                        });
                        return Boolean.valueOf(arrayList.size() == 1);
                    }, "Failed to receive message");
                    Assertions.assertArrayEquals(bytes, (byte[]) ((ConsumerRecord) arrayList.get(0)).key());
                    Assertions.assertArrayEquals(bytes2, (byte[]) ((ConsumerRecord) arrayList.get(0)).value());
                    if (consumer != null) {
                        consumer.close();
                    }
                    if (producer != null) {
                        producer.close();
                    }
                    if (admin != null) {
                        admin.close();
                    }
                } catch (Throwable th) {
                    if (consumer != null) {
                        try {
                            consumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (admin != null) {
                try {
                    admin.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, controllerListener = "FOO")
    public void testControllerListenerName(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
        Assertions.assertEquals("FOO", ((ListenerName) clusterInstance.controllerListenerName().get()).value());
        Admin admin = clusterInstance.admin(Map.of(), true);
        try {
            Assertions.assertEquals(1, ((QuorumInfo) admin.describeMetadataQuorum().quorumInfo().get()).nodes().size());
            if (admin != null) {
                admin.close();
            }
        } catch (Throwable th) {
            if (admin != null) {
                try {
                    admin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT, controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT, serverProperties = {@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")})
    public void testSaslPlaintext(ClusterInstance clusterInstance) throws CancellationException, ExecutionException, InterruptedException {
        Assertions.assertEquals(SecurityProtocol.SASL_PLAINTEXT, clusterInstance.config().brokerSecurityProtocol());
        Admin admin = clusterInstance.admin();
        try {
            admin.describeAcls(AclBindingFilter.ANY).values().get();
            if (admin != null) {
                admin.close();
            }
            String str = "sasl-plaintext-topic";
            clusterInstance.createTopic("sasl-plaintext-topic", 1, (short) 1);
            Producer producer = clusterInstance.producer();
            try {
                producer.send(new ProducerRecord("sasl-plaintext-topic", Utils.utf8("key"), Utils.utf8("value"))).get();
                producer.flush();
                if (producer != null) {
                    producer.close();
                }
                Consumer consumer = clusterInstance.consumer();
                try {
                    consumer.subscribe(List.of("sasl-plaintext-topic"));
                    TestUtils.waitForCondition(() -> {
                        return Boolean.valueOf(consumer.poll(Duration.ofMillis(100L)).count() == 1);
                    }, "Failed to receive message");
                    if (consumer != null) {
                        consumer.close();
                    }
                    Map of = Map.of("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", "plain-user1", "plain-user1-secret"));
                    Admin admin2 = clusterInstance.admin(of);
                    try {
                        Assertions.assertInstanceOf(ClusterAuthorizationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                            admin2.describeAcls(AclBindingFilter.ANY).values().get();
                        })).getCause());
                        if (admin2 != null) {
                            admin2.close();
                        }
                        Producer producer2 = clusterInstance.producer(of);
                        try {
                            Assertions.assertInstanceOf(TopicAuthorizationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                                producer2.send(new ProducerRecord(str, Utils.utf8("key"), Utils.utf8("value"))).get();
                            })).getCause());
                            if (producer2 != null) {
                                producer2.close();
                            }
                            Consumer consumer2 = clusterInstance.consumer(of);
                            try {
                                consumer2.subscribe(List.of("sasl-plaintext-topic"));
                                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                                TestUtils.waitForCondition(() -> {
                                    if (atomicBoolean.get()) {
                                        return true;
                                    }
                                    try {
                                        consumer2.poll(Duration.ofMillis(100L));
                                    } catch (TopicAuthorizationException e) {
                                        atomicBoolean.set(true);
                                    }
                                    return false;
                                }, "Failed to get exception");
                                if (consumer2 != null) {
                                    consumer2.close();
                                }
                                Map of2 = Map.of("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", "unknown", "unknown"));
                                admin2 = clusterInstance.admin(of2);
                                try {
                                    Assertions.assertInstanceOf(SaslAuthenticationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                                        admin2.describeAcls(AclBindingFilter.ANY).values().get();
                                    })).getCause());
                                    if (admin2 != null) {
                                        admin2.close();
                                    }
                                    producer2 = clusterInstance.producer(of2);
                                    try {
                                        Assertions.assertInstanceOf(SaslAuthenticationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                                            producer2.send(new ProducerRecord(str, Utils.utf8("key"), Utils.utf8("value"))).get();
                                        })).getCause());
                                        if (producer2 != null) {
                                            producer2.close();
                                        }
                                        consumer2 = clusterInstance.consumer(of2);
                                        try {
                                            consumer2.subscribe(List.of("sasl-plaintext-topic"));
                                            AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
                                            TestUtils.waitForCondition(() -> {
                                                if (atomicBoolean2.get()) {
                                                    return true;
                                                }
                                                try {
                                                    consumer2.poll(Duration.ofMillis(100L));
                                                } catch (SaslAuthenticationException e) {
                                                    atomicBoolean2.set(true);
                                                }
                                                return false;
                                            }, "Failed to get exception");
                                            if (consumer2 != null) {
                                                consumer2.close();
                                            }
                                        } finally {
                                        }
                                    } finally {
                                    }
                                } finally {
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } finally {
                    }
                } catch (Throwable th) {
                    if (consumer != null) {
                        try {
                            consumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (producer != null) {
                    try {
                        producer.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            if (admin != null) {
                try {
                    admin.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT, controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT, serverProperties = {@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")})
    public void testSaslPlaintextWithController(ClusterInstance clusterInstance) throws CancellationException, ExecutionException, InterruptedException {
        Admin admin = clusterInstance.admin(Map.of(), true);
        try {
            admin.describeAcls(AclBindingFilter.ANY).values().get();
            if (admin != null) {
                admin.close();
            }
            admin = clusterInstance.admin(Map.of("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", "plain-user1", "plain-user1-secret")), true);
            try {
                Assertions.assertInstanceOf(TimeoutException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                    admin.describeAcls(AclBindingFilter.ANY, new DescribeAclsOptions().timeoutMs(5000)).values().get();
                })).getCause());
                if (admin != null) {
                    admin.close();
                }
            } finally {
            }
        } finally {
        }
    }
}
