package org.apache.kafka.common.test;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import kafka.log.UnifiedLog;
import kafka.network.SocketServer;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import scala.jdk.javaapi.CollectionConverters;

/* loaded from: input_file:org/apache/kafka/common/test/ClusterInstance.class */
public interface ClusterInstance {
    Type type();

    Map<Integer, KafkaBroker> brokers();

    default Map<Integer, KafkaBroker> aliveBrokers() {
        return (Map) brokers().entrySet().stream().filter(entry -> {
            return !((KafkaBroker) entry.getValue()).isShutdown();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    Map<Integer, ControllerServer> controllers();

    ClusterConfig config();

    Set<Integer> controllerIds();

    default Set<Integer> brokerIds() {
        return brokers().keySet();
    }

    ListenerName clientListener();

    default Optional<ListenerName> controllerListenerName() {
        return Optional.empty();
    }

    String bootstrapServers();

    String bootstrapControllers();

    default Collection<SocketServer> brokerSocketServers() {
        return (Collection) brokers().values().stream().map((v0) -> {
            return v0.socketServer();
        }).collect(Collectors.toList());
    }

    Collection<SocketServer> controllerSocketServers();

    default SocketServer anyBrokerSocketServer() {
        return brokerSocketServers().stream().findFirst().orElseThrow(() -> {
            return new RuntimeException("No broker SocketServers found");
        });
    }

    default SocketServer anyControllerSocketServer() {
        return controllerSocketServers().stream().findFirst().orElseThrow(() -> {
            return new RuntimeException("No controller SocketServers found");
        });
    }

    String clusterId();

    default <K, V> Producer<K, V> producer(Map<String, Object> map) {
        Map<String, Object> hashMap = new HashMap<>(map);
        hashMap.putIfAbsent("key.serializer", ByteArraySerializer.class.getName());
        hashMap.putIfAbsent("value.serializer", ByteArraySerializer.class.getName());
        hashMap.putIfAbsent("bootstrap.servers", bootstrapServers());
        return new KafkaProducer(setClientSaslConfig(hashMap));
    }

    default <K, V> Producer<K, V> producer() {
        return producer(Map.of());
    }

    default <K, V> Consumer<K, V> consumer(Map<String, Object> map) {
        Map<String, Object> hashMap = new HashMap<>(map);
        hashMap.putIfAbsent("key.deserializer", ByteArrayDeserializer.class.getName());
        hashMap.putIfAbsent("value.deserializer", ByteArrayDeserializer.class.getName());
        hashMap.putIfAbsent("auto.offset.reset", "earliest");
        hashMap.putIfAbsent("group.id", "group_" + TestUtils.randomString(5));
        hashMap.putIfAbsent("bootstrap.servers", bootstrapServers());
        return new KafkaConsumer(setClientSaslConfig(hashMap));
    }

    default <K, V> Consumer<K, V> consumer() {
        return consumer(Map.of());
    }

    default Admin admin(Map<String, Object> map, boolean z) {
        HashMap hashMap = new HashMap(map);
        if (z) {
            hashMap.putIfAbsent("bootstrap.controllers", bootstrapControllers());
            hashMap.remove("bootstrap.servers");
        } else {
            hashMap.putIfAbsent("bootstrap.servers", bootstrapServers());
            hashMap.remove("bootstrap.controllers");
        }
        return Admin.create(setClientSaslConfig(hashMap));
    }

    default Map<String, Object> setClientSaslConfig(Map<String, Object> map) {
        HashMap hashMap = new HashMap(map);
        if (config().brokerSecurityProtocol() == SecurityProtocol.SASL_PLAINTEXT) {
            hashMap.putIfAbsent("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name);
            hashMap.putIfAbsent("sasl.mechanism", "PLAIN");
            hashMap.putIfAbsent("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD));
        }
        return hashMap;
    }

    default Admin admin(Map<String, Object> map) {
        return admin(map, false);
    }

    default Admin admin() {
        return admin(Map.of(), false);
    }

    default Set<GroupProtocol> supportedGroupProtocols() {
        return brokers().values().stream().allMatch(kafkaBroker -> {
            return kafkaBroker.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled();
        }) ? Set.of(GroupProtocol.CLASSIC, GroupProtocol.CONSUMER) : Collections.singleton(GroupProtocol.CLASSIC);
    }

    Optional<FaultHandlerException> firstFatalException();

    Optional<FaultHandlerException> firstNonFatalException();

    void start();

    void stop();

    void shutdownBroker(int i);

    void startBroker(int i);

    default void waitTopicDeletion(String str) throws InterruptedException {
        waitForTopic(str, 0);
    }

    default void createTopic(String str, int i, short s) throws InterruptedException {
        Admin admin = admin();
        try {
            admin.createTopics(Collections.singletonList(new NewTopic(str, i, s)));
            waitForTopic(str, i);
            if (admin != null) {
                admin.close();
            }
        } catch (Throwable th) {
            if (admin != null) {
                try {
                    admin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    void waitForReadyBrokers() throws InterruptedException;

    default void waitForTopic(String str, int i) throws InterruptedException {
        Collection<KafkaBroker> values = aliveBrokers().values();
        TestUtils.waitForCondition(() -> {
            return Boolean.valueOf(values.stream().allMatch(kafkaBroker -> {
                return i == 0 ? kafkaBroker.metadataCache().numPartitions(str).isEmpty() : kafkaBroker.metadataCache().numPartitions(str).contains(Integer.valueOf(i));
            }));
        }, 60000L, str + " metadata not propagated after 60000 ms");
        Iterator<ControllerServer> it = controllers().values().iterator();
        while (it.hasNext()) {
            long offset = it.next().raftManager().replicatedLog().endOffset().offset() - 1;
            TestUtils.waitForCondition(() -> {
                return Boolean.valueOf(values.stream().allMatch(kafkaBroker -> {
                    return ((BrokerServer) kafkaBroker).sharedServer().loader().lastAppliedOffset() >= offset;
                }));
            }, 60000L, "Timeout waiting for controller metadata propagating to brokers");
        }
        if (i == 0) {
            List list = (List) IntStream.range(0, 1).mapToObj(i2 -> {
                return new TopicPartition(str, i2);
            }).collect(Collectors.toList());
            TestUtils.waitForCondition(() -> {
                return Boolean.valueOf(values.stream().allMatch(kafkaBroker -> {
                    return list.stream().allMatch(topicPartition -> {
                        return kafkaBroker.replicaManager().onlinePartition(topicPartition).isEmpty();
                    });
                }));
            }, "Replica manager's should have deleted all of this topic's partitions");
            TestUtils.waitForCondition(() -> {
                return Boolean.valueOf(values.stream().allMatch(kafkaBroker -> {
                    return list.stream().allMatch(topicPartition -> {
                        return kafkaBroker.logManager().getLog(topicPartition, false).isEmpty();
                    });
                }));
            }, "Replica logs not deleted after delete topic is complete");
            TestUtils.waitForCondition(() -> {
                return Boolean.valueOf(values.stream().allMatch(kafkaBroker -> {
                    return list.stream().allMatch(topicPartition -> {
                        return CollectionConverters.asJava(kafkaBroker.logManager().liveLogDirs()).stream().allMatch(file -> {
                            try {
                                return !new OffsetCheckpointFile(new File(file, "cleaner-offset-checkpoint"), (LogDirFailureChannel) null).read().containsKey(topicPartition);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        });
                    });
                }));
            }, "Cleaner offset for deleted partition should have been removed");
            TestUtils.waitForCondition(() -> {
                return Boolean.valueOf(values.stream().allMatch(kafkaBroker -> {
                    return CollectionConverters.asJava(kafkaBroker.config().logDirs()).stream().allMatch(str2 -> {
                        return list.stream().noneMatch(topicPartition -> {
                            return new File(str2, topicPartition.topic() + "-" + topicPartition.partition()).exists();
                        });
                    });
                }));
            }, "Failed to soft-delete the data to a delete directory");
            TestUtils.waitForCondition(() -> {
                return Boolean.valueOf(values.stream().allMatch(kafkaBroker -> {
                    return CollectionConverters.asJava(kafkaBroker.config().logDirs()).stream().allMatch(str2 -> {
                        return list.stream().allMatch(topicPartition -> {
                            return Arrays.stream((String[]) Objects.requireNonNull(new File(str2).list())).noneMatch(str2 -> {
                                return str2.startsWith(topicPartition.topic() + "-" + topicPartition.partition()) && str2.endsWith(UnifiedLog.DeleteDirSuffix());
                            });
                        });
                    });
                }));
            }, "Failed to hard-delete the delete directory");
        }
    }

    default List<Authorizer> authorizers() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll((Collection) brokers().values().stream().filter(kafkaBroker -> {
            return kafkaBroker.authorizer().isDefined();
        }).map(kafkaBroker2 -> {
            return (Authorizer) kafkaBroker2.authorizer().get();
        }).collect(Collectors.toList()));
        arrayList.addAll((Collection) controllers().values().stream().filter(controllerServer -> {
            return controllerServer.authorizer().isDefined();
        }).map(controllerServer2 -> {
            return (Authorizer) controllerServer2.authorizer().get();
        }).collect(Collectors.toList()));
        return arrayList;
    }

    default void waitAcls(AclBindingFilter aclBindingFilter, Collection<AccessControlEntry> collection) throws InterruptedException {
        for (Authorizer authorizer : authorizers()) {
            AtomicReference atomicReference = new AtomicReference(new HashSet());
            TestUtils.waitForCondition(() -> {
                HashSet hashSet = new HashSet();
                authorizer.acls(aclBindingFilter).forEach(aclBinding -> {
                    hashSet.add(aclBinding.entry());
                });
                atomicReference.set(hashSet);
                return Boolean.valueOf(hashSet.containsAll(collection) && collection.containsAll(hashSet));
            }, "expected acls: " + String.valueOf(collection) + ", actual acls: " + String.valueOf(atomicReference.get()));
        }
    }
}
