package kafka.server;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeFeaturesResult;
import org.apache.kafka.clients.admin.FeatureMetadata;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.clients.admin.FinalizedVersionRange;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.clients.admin.UpdateFeaturesResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidUpdateVersionException;
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Timeout;

@Timeout(120)
@ClusterTestDefaults(types = {Type.KRAFT})
/* loaded from: input_file:kafka/server/BootstrapControllersIntegrationTest.class */
public class BootstrapControllersIntegrationTest {
    private Map<String, Object> adminConfig(ClusterInstance clusterInstance, boolean z) {
        return z ? Collections.singletonMap("bootstrap.controllers", clusterInstance.bootstrapControllers()) : Collections.singletonMap("bootstrap.servers", clusterInstance.bootstrapServers());
    }

    @ClusterTest
    public void testPutBrokersInBootstrapControllersConfig(ClusterInstance clusterInstance) {
        Admin create = Admin.create(Collections.singletonMap("bootstrap.controllers", clusterInstance.bootstrapServers()));
        try {
            ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                create.describeCluster().clusterId().get(1L, TimeUnit.MINUTES);
            });
            Assertions.assertNotNull(executionException.getCause());
            Assertions.assertEquals(MismatchedEndpointTypeException.class, executionException.getCause().getClass());
            Assertions.assertEquals("The request was sent to an endpoint of type BROKER, but we wanted an endpoint of type CONTROLLER", executionException.getCause().getMessage());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest
    public void testPutControllersInBootstrapBrokersConfig(ClusterInstance clusterInstance) {
        Admin create = Admin.create(Collections.singletonMap("bootstrap.servers", clusterInstance.bootstrapControllers()));
        try {
            ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                create.describeCluster().clusterId().get(1L, TimeUnit.MINUTES);
            });
            Assertions.assertNotNull(executionException.getCause());
            Assertions.assertEquals(UnsupportedVersionException.class, executionException.getCause().getClass());
            Assertions.assertEquals("The node does not support METADATA", executionException.getCause().getMessage());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest
    public void testDescribeClusterByControllers(ClusterInstance clusterInstance) throws Exception {
        testDescribeCluster(clusterInstance, true);
    }

    @ClusterTest
    public void testDescribeCluster(ClusterInstance clusterInstance) throws Exception {
        testDescribeCluster(clusterInstance, false);
    }

    private void testDescribeCluster(ClusterInstance clusterInstance, boolean z) throws Exception {
        Admin create = Admin.create(adminConfig(clusterInstance, z));
        try {
            DescribeClusterResult describeCluster = create.describeCluster();
            Assertions.assertEquals(clusterInstance.clusterId(), describeCluster.clusterId().get(1L, TimeUnit.MINUTES));
            if (z) {
                Assertions.assertTrue(clusterInstance.controllerIds().contains(Integer.valueOf(((Node) describeCluster.controller().get()).id())));
            }
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest
    public void testDescribeFeaturesByControllers(ClusterInstance clusterInstance) throws Exception {
        testDescribeFeatures(clusterInstance, true);
    }

    @ClusterTest
    public void testDescribeFeatures(ClusterInstance clusterInstance) throws Exception {
        testDescribeFeatures(clusterInstance, false);
    }

    private void testDescribeFeatures(ClusterInstance clusterInstance, boolean z) throws Exception {
        Admin create = Admin.create(adminConfig(clusterInstance, z));
        try {
            DescribeFeaturesResult describeFeatures = create.describeFeatures();
            short featureLevel = clusterInstance.config().metadataVersion().featureLevel();
            Assertions.assertEquals(new FinalizedVersionRange(featureLevel, featureLevel), ((FeatureMetadata) describeFeatures.featureMetadata().get(1L, TimeUnit.MINUTES)).finalizedFeatures().get("metadata.version"));
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest
    public void testUpdateFeaturesByControllers(ClusterInstance clusterInstance) {
        testUpdateFeatures(clusterInstance, true);
    }

    @ClusterTest
    public void testUpdateFeatures(ClusterInstance clusterInstance) {
        testUpdateFeatures(clusterInstance, false);
    }

    private void testUpdateFeatures(ClusterInstance clusterInstance, boolean z) {
        Admin create = Admin.create(adminConfig(clusterInstance, z));
        try {
            UpdateFeaturesResult updateFeatures = create.updateFeatures(Collections.singletonMap("foo.bar.feature", new FeatureUpdate((short) 1, FeatureUpdate.UpgradeType.UPGRADE)), new UpdateFeaturesOptions());
            ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                updateFeatures.all().get(1L, TimeUnit.MINUTES);
            });
            Assertions.assertNotNull(executionException.getCause());
            Assertions.assertEquals(InvalidUpdateVersionException.class, executionException.getCause().getClass());
            Assertions.assertTrue(executionException.getCause().getMessage().endsWith("does not support this feature."), "expected message to end with 'does not support this feature', but it was: " + executionException.getCause().getMessage());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest
    public void testDescribeMetadataQuorumByControllers(ClusterInstance clusterInstance) throws Exception {
        testDescribeMetadataQuorum(clusterInstance, true);
    }

    @ClusterTest
    public void testDescribeMetadataQuorum(ClusterInstance clusterInstance) throws Exception {
        testDescribeMetadataQuorum(clusterInstance, false);
    }

    private void testDescribeMetadataQuorum(ClusterInstance clusterInstance, boolean z) throws Exception {
        Admin create = Admin.create(adminConfig(clusterInstance, z));
        try {
            Assertions.assertTrue(clusterInstance.controllerIds().contains(Integer.valueOf(((QuorumInfo) create.describeMetadataQuorum().quorumInfo().get(1L, TimeUnit.MINUTES)).leaderId())));
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest
    public void testUsingBootstrapControllersOnUnsupportedAdminApi(ClusterInstance clusterInstance) {
        Admin create = Admin.create(adminConfig(clusterInstance, true));
        try {
            ListOffsetsResult listOffsets = create.listOffsets(Collections.singletonMap(new TopicPartition("foo", 0), OffsetSpec.earliest()));
            ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                listOffsets.all().get(1L, TimeUnit.MINUTES);
            });
            Assertions.assertNotNull(executionException.getCause());
            Assertions.assertEquals(UnsupportedEndpointTypeException.class, executionException.getCause().getClass());
            Assertions.assertEquals("This Admin API is not yet supported when communicating directly with the controller quorum.", executionException.getCause().getMessage());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest
    public void testIncrementalAlterConfigsByControllers(ClusterInstance clusterInstance) throws Exception {
        testIncrementalAlterConfigs(clusterInstance, true);
    }

    @ClusterTest
    public void testIncrementalAlterConfigs(ClusterInstance clusterInstance) throws Exception {
        testIncrementalAlterConfigs(clusterInstance, false);
    }

    private void testIncrementalAlterConfigs(ClusterInstance clusterInstance, boolean z) throws Exception {
        Admin create = Admin.create(adminConfig(clusterInstance, z));
        try {
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, (z ? ((ControllerServer) clusterInstance.controllers().values().iterator().next()).config().nodeId() : ((KafkaBroker) clusterInstance.brokers().values().iterator().next()).config().nodeId()));
            ConfigResource configResource2 = new ConfigResource(ConfigResource.Type.BROKER, "");
            HashMap hashMap = new HashMap();
            hashMap.put(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry("my.custom.config", "foo"), AlterConfigOp.OpType.SET)));
            hashMap.put(configResource2, Collections.singletonList(new AlterConfigOp(new ConfigEntry("my.custom.config", "bar"), AlterConfigOp.OpType.SET)));
            create.incrementalAlterConfigs(hashMap).all().get(1L, TimeUnit.MINUTES);
            TestUtils.retryOnExceptionWithTimeout(30000L, () -> {
                ConfigEntry configEntry = (ConfigEntry) ((Config) ((Map) create.describeConfigs(Collections.singletonList(configResource)).all().get(1L, TimeUnit.MINUTES)).get(configResource)).entries().stream().filter(configEntry2 -> {
                    return configEntry2.name().equals("my.custom.config");
                }).findFirst().get();
                Assertions.assertEquals(ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG, configEntry.source(), "Expected entry for my.custom.config to come from DYNAMIC_BROKER_CONFIG. Instead, the entry was: " + String.valueOf(configEntry));
            });
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest(brokers = 3)
    public void testAlterReassignmentsWithBootstrapControllers(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
        String str = "foo";
        Admin create = Admin.create(adminConfig(clusterInstance, false));
        try {
            HashMap hashMap = new HashMap();
            hashMap.put(0, Arrays.asList(0, 1, 2));
            hashMap.put(1, Arrays.asList(1, 2, 0));
            hashMap.put(2, Arrays.asList(2, 1, 0));
            create.createTopics(Collections.singletonList(new NewTopic("foo", hashMap))).all().get();
            waitForTopics(create, Collections.singleton("foo"));
            List asList = Arrays.asList(2, 1, 0);
            List asList2 = Arrays.asList(0, 1, 2);
            List asList3 = Arrays.asList(1, 2);
            HashMap hashMap2 = new HashMap();
            hashMap2.put(new TopicPartition("foo", 0), Optional.of(new NewPartitionReassignment(asList)));
            hashMap2.put(new TopicPartition("foo", 1), Optional.of(new NewPartitionReassignment(asList2)));
            hashMap2.put(new TopicPartition("foo", 2), Optional.of(new NewPartitionReassignment(asList3)));
            Admin create2 = Admin.create(adminConfig(clusterInstance, true));
            try {
                create2.alterPartitionReassignments(hashMap2).all().get();
                TestUtils.waitForCondition(() -> {
                    return ((Map) create2.listPartitionReassignments().reassignments().get()).isEmpty();
                }, "The reassignment never completed.");
                if (create2 != null) {
                    create2.close();
                }
                List asList4 = Arrays.asList(asList, asList2, asList3);
                TestUtils.waitForCondition(() -> {
                    Map map = (Map) create.describeTopics(Collections.singleton(str)).allTopicNames().get();
                    if (map.containsKey(str)) {
                        return asList4.equals(translatePartitionInfoToNodeIdList(((TopicDescription) map.get(str)).partitions()));
                    }
                    return false;
                }, "Timed out waiting for replica assignments for topic " + "foo");
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create2 != null) {
                    try {
                        create2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private static void waitForTopics(Admin admin, Set<String> set) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return ((Set) admin.listTopics().names().get()).containsAll(set);
        }, "timed out waiting for topics");
    }

    private static List<List<Integer>> translatePartitionInfoToNodeIdList(List<TopicPartitionInfo> list) {
        return (List) list.stream().map(topicPartitionInfo -> {
            return (List) topicPartitionInfo.replicas().stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toList());
        }).collect(Collectors.toList());
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "super.users", value = "User:ANONYMOUS"), @ClusterConfigProperty(key = "authorizer.class.name", value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer")})
    public void testAclsByControllers(ClusterInstance clusterInstance) throws Exception {
        testAcls(clusterInstance, true);
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "super.users", value = "User:ANONYMOUS"), @ClusterConfigProperty(key = "authorizer.class.name", value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer")})
    public void testAcls(ClusterInstance clusterInstance) throws Exception {
        testAcls(clusterInstance, false);
    }

    private void testAcls(ClusterInstance clusterInstance, boolean z) throws Exception {
        Admin create = Admin.create(adminConfig(clusterInstance, z));
        try {
            ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL);
            AccessControlEntry accessControlEntry = new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW);
            AclBinding aclBinding = new AclBinding(resourcePattern, accessControlEntry);
            Assertions.assertDoesNotThrow(() -> {
                return (Void) create.createAcls(Collections.singleton(aclBinding)).all().get(1L, TimeUnit.MINUTES);
            });
            clusterInstance.waitAcls(new AclBindingFilter(resourcePattern.toFilter(), AccessControlEntryFilter.ANY), Collections.singleton(accessControlEntry));
            Collection collection = (Collection) create.describeAcls(AclBindingFilter.ANY).values().get(1L, TimeUnit.MINUTES);
            Assertions.assertEquals(1, collection.size());
            Assertions.assertEquals(aclBinding, collection.iterator().next());
            Collection collection2 = (Collection) create.deleteAcls(Collections.singleton(AclBindingFilter.ANY)).all().get(1L, TimeUnit.MINUTES);
            Assertions.assertEquals(1, collection2.size());
            Assertions.assertEquals(aclBinding, collection2.iterator().next());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
