package io.kroxylicious.testing.kafka.common;

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kroxylicious/testing/kafka/common/Utils.class */
public class Utils {
    private static final Logger log = LoggerFactory.getLogger(Utils.class);
    private static final String CONSISTENCY_TEST = "__org_kroxylicious_testing_consistencyTest";

    private Utils() {
    }

    public static void awaitReassignmentOfKafkaInternalTopicsIfNecessary(Map<String, Object> map, int i, int i2, int i3, TimeUnit timeUnit) {
        List of = List.of("__consumer_offsets", "__transaction_state", "__cluster_metadata");
        Admin create = Admin.create(map);
        try {
            awaitCondition(i3, timeUnit).until(() -> {
                Map<String, TopicDescription> describeKnownTopics = describeKnownTopics(of, create);
                HashMap hashMap = new HashMap();
                Optional of2 = Optional.of(new NewPartitionReassignment(List.of(Integer.valueOf(i2))));
                describeKnownTopics.forEach((str, topicDescription) -> {
                    topicDescription.partitions().stream().filter(topicPartitionInfo -> {
                        return topicPartitionInfo.replicas().stream().anyMatch(node -> {
                            return node.id() == i;
                        }) && topicPartitionInfo.replicas().size() < 2;
                    }).toList().forEach(topicPartitionInfo2 -> {
                        hashMap.put(new TopicPartition(str, topicPartitionInfo2.partition()), of2);
                    });
                });
                if (hashMap.isEmpty()) {
                    log.debug("No kafka internal topic partitions need re-assigning from node {}", Integer.valueOf(i));
                    return true;
                }
                log.debug("Kafka internal topic partitions to re-assign: {}", hashMap);
                create.alterPartitionReassignments(hashMap).all().get();
                return true;
            });
            awaitCondition(i3, timeUnit).until(() -> {
                Set set = (Set) ((Map) create.listPartitionReassignments().reassignments().get()).keySet().stream().filter(topicPartition -> {
                    return of.contains(topicPartition.topic());
                }).collect(Collectors.toSet());
                if (set.isEmpty()) {
                    log.debug("Kafka internal topic partitions re-assigment complete.");
                    return true;
                }
                log.debug("Kafka internal topic partitions re-assigment in-progress: {}", set);
                return false;
            });
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public static void awaitExpectedBrokerCountInClusterViaTopic(Map<String, Object> map, int i, TimeUnit timeUnit, Integer num) {
        Admin create = Admin.create(map);
        try {
            log.debug("Creating topic: {} via {}", CONSISTENCY_TEST, map.get("bootstrap.servers"));
            CompletionStage<Void> createTopic = createTopic(num, create);
            log.debug("Waiting for {} to be replicated to {} brokers", CONSISTENCY_TEST, num);
            awaitCondition(i, timeUnit).failFast(() -> {
                CompletableFuture completableFuture = createTopic.toCompletableFuture();
                if (completableFuture.isCompletedExceptionally()) {
                    completableFuture.get();
                }
            }).until(() -> {
                log.debug("Calling describe topic");
                CompletableFuture completableFuture = new CompletableFuture();
                create.describeTopics(Set.of(CONSISTENCY_TEST)).allTopicNames().whenComplete((map2, th) -> {
                    if (th == null) {
                        log.debug("Current topicDescriptions: {}", map2);
                        checkReplicaDistribution(num, completableFuture, map2);
                        return;
                    }
                    if ((th instanceof CompletionException) && (th.getCause() instanceof UnknownTopicOrPartitionException)) {
                        log.debug("Cluster quorum test topic ({}) doesn't exist yet", CONSISTENCY_TEST);
                    } else {
                        log.warn("Unexpected failure describing topic: {} due to {}", new Object[]{CONSISTENCY_TEST, th.getMessage(), th});
                    }
                    completableFuture.complete(false);
                }).get(1L, TimeUnit.SECONDS);
                Boolean bool = (Boolean) completableFuture.getNow(false);
                if (bool.booleanValue()) {
                    deleteTopic(create);
                }
                return bool;
            });
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static void deleteTopic(Admin admin) throws InterruptedException, TimeoutException, ExecutionException {
        try {
            admin.deleteTopics(Set.of(CONSISTENCY_TEST)).all().get(10L, TimeUnit.SECONDS);
        } catch (TopicDeletionDisabledException e) {
            log.warn("caught {} deleting {}", new Object[]{e.getMessage(), CONSISTENCY_TEST, e});
        } catch (ExecutionException e2) {
            TopicDeletionDisabledException cause = e2.getCause();
            if (!(cause instanceof TopicDeletionDisabledException)) {
                throw e2;
            }
            TopicDeletionDisabledException topicDeletionDisabledException = cause;
            log.warn("Failed to delete {}. Caught: {} ", new Object[]{CONSISTENCY_TEST, topicDeletionDisabledException.getMessage(), topicDeletionDisabledException});
        }
    }

    private static void checkReplicaDistribution(Integer num, CompletableFuture<Boolean> completableFuture, Map<String, TopicDescription> map) {
        TopicDescription topicDescription = map.get(CONSISTENCY_TEST);
        if (topicDescription != null) {
            long count = topicDescription.partitions().stream().map((v0) -> {
                return v0.replicas();
            }).flatMap((v0) -> {
                return v0.stream();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).distinct().count();
            if (count == num.intValue()) {
                log.debug("Expected number of replicas found.");
                completableFuture.complete(true);
                return;
            }
            log.debug("Unexpected number of replicas found expected: {} got: {}", num, Long.valueOf(count));
        }
        completableFuture.complete(false);
    }

    private static CompletionStage<Void> createTopic(Integer num, Admin admin) {
        return admin.createTopics(Set.of(new NewTopic(CONSISTENCY_TEST, 1, num.shortValue()))).all().toCompletionStage().thenRun(() -> {
            log.debug("Create future for topic {} completed.", CONSISTENCY_TEST);
        }).exceptionallyComposeAsync(th -> {
            log.warn("Failed to create topic: {} due to {}", CONSISTENCY_TEST, th.getMessage());
            return isRetryable(th) ? createTopic(num, admin) : CompletableFuture.failedStage(new RuntimeException("Failed to create topic: __org_kroxylicious_testing_consistencyTest", th));
        }, CompletableFuture.delayedExecutor(100L, TimeUnit.MILLISECONDS));
    }

    private static boolean isRetryable(Throwable th) {
        Throwable cause = (!(th instanceof CompletionException) || th.getCause() == null) ? th : th.getCause();
        return ((cause instanceof RetriableException) && (cause.getMessage() == null || !cause.getMessage().contains("The AdminClient is not accepting new calls"))) || (cause instanceof InvalidReplicationFactorException) || ((cause instanceof TopicExistsException) && cause.getMessage().contains("is marked for deletion"));
    }

    public static ConditionFactory awaitCondition(int i, TimeUnit timeUnit) {
        return Awaitility.await().pollDelay(Duration.ZERO).pollInterval(500L, TimeUnit.MILLISECONDS).atMost(i, timeUnit).ignoreExceptions();
    }

    private static Map<String, TopicDescription> describeKnownTopics(List<String> list, Admin admin) throws Exception {
        HashMap hashMap = new HashMap();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            try {
                hashMap.putAll((Map) admin.describeTopics(List.of(it.next())).allTopicNames().get());
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
                    Throwable cause = e.getCause();
                    if (cause instanceof RuntimeException) {
                        throw ((RuntimeException) cause);
                    }
                    throw new RuntimeException(e.getCause());
                }
            }
        }
        return hashMap;
    }
}
