package org.apache.kafka.connect.util.clusters;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/util/clusters/ConnectAssertions.class */
public class ConnectAssertions {
    private static final Logger log = LoggerFactory.getLogger(ConnectAssertions.class);
    public static final long WORKER_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(5);
    public static final long VALIDATION_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
    public static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(2);
    public static final long CONNECTOR_SHUTDOWN_DURATION_MS = TimeUnit.MINUTES.toMillis(1);
    private static final long CONNECT_INTERNAL_TOPIC_UPDATES_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
    private final EmbeddedConnect connect;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConnectAssertions(EmbeddedConnect embeddedConnect) {
        this.connect = embeddedConnect;
    }

    public void assertAtLeastNumWorkersAreUp(int i, String str) throws InterruptedException {
        try {
            TestUtils.waitForCondition(() -> {
                return checkWorkersUp(i, (num, num2) -> {
                    return Boolean.valueOf(num.intValue() >= num2.intValue());
                }).orElse(false).booleanValue();
            }, WORKER_SETUP_DURATION_MS, "Didn't meet the minimum requested number of online workers: " + i);
        } catch (AssertionError e) {
            throw new AssertionError(str, e);
        }
    }

    public void assertExactlyNumWorkersAreUp(int i, String str) throws InterruptedException {
        try {
            TestUtils.waitForCondition(() -> {
                return checkWorkersUp(i, (num, num2) -> {
                    return Boolean.valueOf(num == num2);
                }).orElse(false).booleanValue();
            }, WORKER_SETUP_DURATION_MS, "Didn't meet the exact requested number of online workers: " + i);
        } catch (AssertionError e) {
            throw new AssertionError(str, e);
        }
    }

    protected Optional<Boolean> checkWorkersUp(int i, BiFunction<Integer, Integer, Boolean> biFunction) {
        try {
            return Optional.of(biFunction.apply(Integer.valueOf(this.connect.healthyWorkers().size()), Integer.valueOf(i)));
        } catch (Exception e) {
            log.error("Could not check active workers.", e);
            return Optional.empty();
        }
    }

    public void assertExactlyNumBrokersAreUp(int i, String str) throws InterruptedException {
        try {
            TestUtils.waitForCondition(() -> {
                return checkBrokersUp(i, (num, num2) -> {
                    return Boolean.valueOf(num == num2);
                }).orElse(false).booleanValue();
            }, WORKER_SETUP_DURATION_MS, "Didn't meet the exact requested number of online brokers: " + i);
        } catch (AssertionError e) {
            throw new AssertionError(str, e);
        }
    }

    protected Optional<Boolean> checkBrokersUp(int i, BiFunction<Integer, Integer, Boolean> biFunction) {
        try {
            return Optional.of(biFunction.apply(Integer.valueOf(this.connect.kafka().runningBrokers().size()), Integer.valueOf(i)));
        } catch (Exception e) {
            log.error("Could not check running brokers.", e);
            return Optional.empty();
        }
    }

    public void assertTopicsDoNotExist(String... strArr) throws InterruptedException {
        HashSet hashSet = new HashSet(Arrays.asList(strArr));
        AtomicReference atomicReference = new AtomicReference(hashSet);
        TestUtils.waitForCondition(() -> {
            return checkTopicsExist(hashSet, (set, set2) -> {
                atomicReference.set(set);
                return Boolean.valueOf(set.isEmpty());
            }).orElse(false).booleanValue();
        }, CONNECTOR_SETUP_DURATION_MS, "Unexpectedly found topics " + atomicReference.get());
    }

    public void assertTopicsExist(String... strArr) throws InterruptedException {
        HashSet hashSet = new HashSet(Arrays.asList(strArr));
        AtomicReference atomicReference = new AtomicReference(hashSet);
        TestUtils.waitForCondition(() -> {
            return checkTopicsExist(hashSet, (set, set2) -> {
                HashSet hashSet2 = new HashSet(set2);
                hashSet2.removeAll(set);
                atomicReference.set(hashSet2);
                return Boolean.valueOf(hashSet2.isEmpty());
            }).orElse(false).booleanValue();
        }, CONNECTOR_SETUP_DURATION_MS, "Didn't find the topics " + atomicReference.get());
    }

    protected Optional<Boolean> checkTopicsExist(Set<String> set, BiFunction<Set<String>, Set<String>, Boolean> biFunction) {
        try {
            return Optional.of(biFunction.apply((Set) this.connect.kafka().describeTopics(set).entrySet().stream().filter(entry -> {
                return ((Optional) entry.getValue()).isPresent();
            }).map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toSet()), set));
        } catch (Exception e) {
            log.error("Failed to describe the topic(s): {}.", set, e);
            return Optional.empty();
        }
    }

    public void assertTopicSettings(String str, int i, int i2, String str2) throws InterruptedException {
        try {
            TestUtils.waitForCondition(() -> {
                return checkTopicSettings(str, i, i2).orElse(false).booleanValue();
            }, VALIDATION_DURATION_MS, "Topic " + str + " does not exist or does not have exactly " + i2 + " partitions or at least " + i + " per partition");
        } catch (AssertionError e) {
            throw new AssertionError(str2, e);
        }
    }

    protected Optional<Boolean> checkTopicSettings(String str, int i, int i2) {
        try {
            TopicDescription orElse = this.connect.kafka().describeTopics(str).get(str).orElse(null);
            return Optional.of(Boolean.valueOf(orElse != null && orElse.name().equals(str) && orElse.partitions().size() == i2 && orElse.partitions().stream().allMatch(topicPartitionInfo -> {
                return topicPartitionInfo.replicas().size() >= i;
            })));
        } catch (Exception e) {
            log.error("Failed to describe the topic: {}.", str, e);
            return Optional.empty();
        }
    }

    public void assertExactlyNumErrorsOnConnectorConfigValidation(String str, Map<String, String> map, int i, String str2) throws InterruptedException {
        assertExactlyNumErrorsOnConnectorConfigValidation(str, map, i, str2, VALIDATION_DURATION_MS);
    }

    public void assertExactlyNumErrorsOnConnectorConfigValidation(String str, Map<String, String> map, int i, String str2, long j) throws InterruptedException {
        try {
            TestUtils.waitForCondition(() -> {
                return checkValidationErrors(str, map, i, (num, num2) -> {
                    return Boolean.valueOf(num == num2);
                }).orElse(false).booleanValue();
            }, j, "Didn't meet the exact requested number of validation errors: " + i);
        } catch (AssertionError e) {
            throw new AssertionError(str2, e);
        }
    }

    protected Optional<Boolean> checkValidationErrors(String str, Map<String, String> map, int i, BiFunction<Integer, Integer, Boolean> biFunction) {
        try {
            return Optional.of(biFunction.apply(Integer.valueOf(this.connect.validateConnectorConfig(str, map).errorCount()), Integer.valueOf(i)));
        } catch (Exception e) {
            log.error("Could not check config validation error count.", e);
            return Optional.empty();
        }
    }

    public void assertConnectorAndAtLeastNumTasksAreRunning(String str, int i, String str2) throws InterruptedException {
        waitForConnectorState(str, AbstractStatus.State.RUNNING, atLeast(i), null, AbstractStatus.State.RUNNING, "The connector or at least " + i + " of tasks are not running.", str2, CONNECTOR_SETUP_DURATION_MS);
    }

    public void assertConnectorAndExactlyNumTasksAreRunning(String str, int i, String str2) throws InterruptedException {
        waitForConnectorState(str, AbstractStatus.State.RUNNING, exactly(i), null, AbstractStatus.State.RUNNING, "The connector or exactly " + i + " tasks are not running.", str2, CONNECTOR_SETUP_DURATION_MS);
    }

    public void assertConnectorAndExactlyNumTasksArePaused(String str, int i, String str2) throws InterruptedException {
        waitForConnectorState(str, AbstractStatus.State.PAUSED, exactly(i), null, AbstractStatus.State.PAUSED, "The connector or exactly " + i + " tasks are not paused.", str2, CONNECTOR_SHUTDOWN_DURATION_MS);
    }

    public void assertConnectorIsRunningAndTasksHaveFailed(String str, int i, String str2) throws InterruptedException {
        waitForConnectorState(str, AbstractStatus.State.RUNNING, exactly(i), null, AbstractStatus.State.FAILED, "Either the connector is not running or not all the " + i + " tasks have failed.", str2, CONNECTOR_SETUP_DURATION_MS);
    }

    public void assertConnectorIsRunningAndNumTasksHaveFailed(String str, int i, int i2, String str2) throws InterruptedException {
        waitForConnectorState(str, AbstractStatus.State.RUNNING, exactly(i), Integer.valueOf(i2), AbstractStatus.State.FAILED, "Either the connector is not running or not all the " + i + " tasks have failed.", str2, CONNECTOR_SETUP_DURATION_MS);
    }

    public void assertConnectorIsFailedAndTasksHaveFailed(String str, int i, String str2) throws InterruptedException {
        waitForConnectorState(str, AbstractStatus.State.FAILED, exactly(i), null, AbstractStatus.State.FAILED, "Either the connector is running or not all the " + i + " tasks have failed.", str2, CONNECTOR_SETUP_DURATION_MS);
    }

    public void assertConnectorIsFailedAndNumTasksAreRunning(String str, int i, String str2) throws InterruptedException {
        waitForConnectorState(str, AbstractStatus.State.FAILED, exactly(i), null, AbstractStatus.State.RUNNING, "Either the connector is running or not all the " + i + " tasks are running.", str2, CONNECTOR_SETUP_DURATION_MS);
    }

    public void assertConnectorDoesNotExist(String str, String str2) throws InterruptedException {
        try {
            TestUtils.waitForCondition(() -> {
                return checkConnectorDoesNotExist(str);
            }, CONNECTOR_SETUP_DURATION_MS, "The connector should not exist.");
        } catch (AssertionError e) {
            throw new AssertionError(str2, e);
        }
    }

    protected boolean checkConnectorDoesNotExist(String str) {
        try {
            this.connect.connectorStatus(str);
            return false;
        } catch (Exception e) {
            log.error("Could not check connector state info.", e);
            return false;
        } catch (ConnectRestException e2) {
            return e2.statusCode() == Response.Status.NOT_FOUND.getStatusCode();
        }
    }

    public void assertConnectorIsStopped(String str, String str2) throws InterruptedException {
        waitForConnectorState(str, AbstractStatus.State.STOPPED, exactly(0), null, null, "At least the connector or one of its tasks is still running", str2, CONNECTOR_SHUTDOWN_DURATION_MS);
    }

    protected void waitForConnectorState(String str, AbstractStatus.State state, Predicate<Integer> predicate, Integer num, AbstractStatus.State state2, String str2, String str3, long j) throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        try {
            TestUtils.waitForCondition(() -> {
                try {
                    ConnectorStateInfo connectorStatus = this.connect.connectorStatus(str);
                    atomicReference.set(connectorStatus);
                    atomicReference2.set(null);
                    if (connectorStatus == null) {
                        return false;
                    }
                    int intValue = ((Integer) Optional.ofNullable(num).orElse(Integer.valueOf(connectorStatus.tasks().size()))).intValue();
                    if (predicate.test(Integer.valueOf(connectorStatus.tasks().size())) && connectorStatus.connector().state().equals(state.toString())) {
                        if (connectorStatus.tasks().stream().filter(taskState -> {
                            return taskState.state().equals(state2.toString());
                        }).count() == intValue) {
                            return true;
                        }
                    }
                    return false;
                } catch (Exception e) {
                    log.error("Could not check connector state info.", e);
                    atomicReference.set(null);
                    atomicReference2.set(e);
                    return false;
                }
            }, j, () -> {
                if (atomicReference.get() != null) {
                    return str2 + " When last checked, " + stateSummary((ConnectorStateInfo) atomicReference.get());
                }
                if (atomicReference2.get() == null) {
                    return str2;
                }
                String str4 = str2 + " The last attempt to check the connector state failed: " + ((Exception) atomicReference2.get()).getClass();
                String message = ((Exception) atomicReference2.get()).getMessage();
                if (message != null) {
                    str4 = str4 + ": " + message;
                }
                return str4;
            });
        } catch (AssertionError e) {
            throw new AssertionError(str3, e);
        }
    }

    public void assertConnectorActiveTopics(String str, Collection<String> collection, String str2) throws InterruptedException {
        try {
            TestUtils.waitForCondition(() -> {
                return checkConnectorActiveTopics(str, collection).orElse(false).booleanValue();
            }, CONNECT_INTERNAL_TOPIC_UPDATES_DURATION_MS, "Connector active topics don't match the expected collection");
        } catch (AssertionError e) {
            throw new AssertionError(str2, e);
        }
    }

    protected Optional<Boolean> checkConnectorActiveTopics(String str, Collection<String> collection) {
        try {
            ActiveTopicsInfo connectorTopics = this.connect.connectorTopics(str);
            boolean z = connectorTopics != null && collection.size() == connectorTopics.topics().size() && collection.containsAll(connectorTopics.topics());
            log.debug("Found connector {} using topics: {}", str, connectorTopics.topics());
            return Optional.of(Boolean.valueOf(z));
        } catch (Exception e) {
            log.error("Could not check connector {} state info.", str, e);
            return Optional.empty();
        }
    }

    private static String stateSummary(ConnectorStateInfo connectorStateInfo) {
        String str = "the connector was " + connectorStateInfo.connector().state();
        if (connectorStateInfo.tasks().isEmpty()) {
            return str + " and no tasks were running";
        }
        return str + " and its tasks were in these states: " + ((String) connectorStateInfo.tasks().stream().map((v0) -> {
            return v0.state();
        }).collect(Collectors.joining(", ")));
    }

    private static Predicate<Integer> exactly(int i) {
        return num -> {
            return num.intValue() == i;
        };
    }

    private static Predicate<Integer> atLeast(int i) {
        return num -> {
            return num.intValue() >= i;
        };
    }
}
