package org.apache.kafka.connect.integration;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.integration.BlockingConnectorTest;
import org.apache.kafka.connect.runtime.ErrorHandlingTaskTest;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectStandalone;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("integration")
/* loaded from: input_file:org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.class */
public class StandaloneWorkerIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(StandaloneWorkerIntegrationTest.class);
    private static final String CONNECTOR_NAME = "test-connector";
    private static final int NUM_TASKS = 4;
    private static final String TOPIC_NAME = "test-topic";
    private EmbeddedConnectStandalone.Builder connectBuilder;
    private EmbeddedConnectStandalone connect;

    @BeforeEach
    public void setup() {
        this.connectBuilder = new EmbeddedConnectStandalone.Builder();
    }

    @AfterEach
    public void cleanup() {
        BlockingConnectorTest.Block.reset();
        if (this.connect != null) {
            this.connect.stop();
        }
        BlockingConnectorTest.Block.join();
    }

    @Test
    public void testDynamicLogging() {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        Map allLogLevels = this.connect.allLogLevels();
        Assertions.assertFalse(allLogLevels.isEmpty(), "Connect REST API did not list any known loggers");
        Assertions.assertEquals(Collections.emptyMap(), Utils.filterMap(allLogLevels, StandaloneWorkerIntegrationTest::isModified), "No loggers should have a non-null last-modified timestamp");
        this.connect.setLogLevel("org.apache.kafka.connect", "ERROR", null);
        Map<String, LoggerLevel> testSetLoggingLevel = testSetLoggingLevel("org.apache.kafka.connect", "DEBUG", null, allLogLevels);
        this.connect.setLogLevel("org.apache.kafka.clients", "WARN", "worker");
        Map<String, LoggerLevel> testSetLoggingLevel2 = testSetLoggingLevel("org.apache.kafka.clients", "INFO", "worker", testSetLoggingLevel);
        LoggerLevel logLevel = this.connect.getLogLevel("org.apache.kafka.clients");
        this.connect.setLogLevel("org.apache.kafka.clients", "INFO", "worker");
        Assertions.assertEquals(logLevel, this.connect.getLogLevel("org.apache.kafka.clients"), "Log level and last-modified timestamp should not be affected by consecutive identical requests");
        this.connect.setLogLevel("org.apache.kafka.streams", "DEBUG", "cluster");
        testSetLoggingLevel("org.apache.kafka.streams", "TRACE", "cluster", testSetLoggingLevel2);
    }

    private Map<String, LoggerLevel> testSetLoggingLevel(String str, String str2, String str3, Map<String, LoggerLevel> map) {
        long currentTimeMillis = System.currentTimeMillis();
        List logLevel = this.connect.setLogLevel(str, str2, str3);
        if ("cluster".equals(str3)) {
            Assertions.assertNull(logLevel, "Modifying log levels with scope=cluster should result in an empty response");
        } else {
            Assertions.assertTrue(logLevel.contains(str));
            Assertions.assertEquals(Collections.emptyList(), (List) logLevel.stream().filter(str4 -> {
                return !str4.startsWith(str);
            }).collect(Collectors.toList()), "No loggers outside the namespace '" + str + "' should have been included in the response for a request to modify that namespace");
        }
        LoggerLevel logLevel2 = this.connect.getLogLevel(str);
        Assertions.assertNotNull(logLevel2);
        Assertions.assertEquals(str2, logLevel2.level());
        Assertions.assertNotNull(logLevel2.lastModified());
        Assertions.assertTrue(logLevel2.lastModified().longValue() >= currentTimeMillis, "Last-modified timestamp for logger level is " + logLevel2.lastModified() + ", which is before " + currentTimeMillis + ", the most-recent time the level was adjusted");
        Map<String, LoggerLevel> allLogLevels = this.connect.allLogLevels();
        Assertions.assertEquals(Collections.emptyMap(), Utils.filterMap(allLogLevels, entry -> {
            return hasNamespace(entry, str) && !(level(entry).equals(str2) && isModified(entry) && lastModified(entry).longValue() >= currentTimeMillis);
        }), "At least one logger in the affected namespace '" + str + "' does not have the expected level of '" + str2 + "', has a null last-modified timestamp, or has a last-modified timestamp that is less recent than " + currentTimeMillis + ", which is when the namespace was last adjusted");
        Assertions.assertEquals(Collections.emptySet(), Utils.diff(HashSet::new, map.keySet(), allLogLevels.keySet()), "At least one logger was present in the listing of all loggers before the logging level for namespace '" + str + "' was set to '" + str2 + "' that is no longer present");
        Assertions.assertEquals(Collections.emptyMap(), Utils.filterMap(allLogLevels, entry2 -> {
            return (hasNamespace(entry2, str) || ((LoggerLevel) entry2.getValue()).equals(map.get(entry2.getKey()))) ? false : true;
        }), "At least one logger outside of the affected namespace '" + str + "' has a different logging level or last-modified timestamp than it did before the namespace was set to level '" + str2 + "'; none of these loggers should have been affected");
        return allLogLevels;
    }

    private static boolean hasNamespace(Map.Entry<String, ?> entry, String str) {
        return entry.getKey().startsWith(str);
    }

    private static boolean isModified(Map.Entry<?, LoggerLevel> entry) {
        return lastModified(entry) != null;
    }

    private static Long lastModified(Map.Entry<?, LoggerLevel> entry) {
        return entry.getValue().lastModified();
    }

    private static String level(Map.Entry<?, LoggerLevel> entry) {
        return entry.getValue().level();
    }

    @Test
    public void testCreateConnectorWithStoppedInitialState() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        this.connect.configureConnector(new CreateConnectorRequest(CONNECTOR_NAME, defaultSourceConnectorProps(TOPIC_NAME), CreateConnectorRequest.InitialState.STOPPED));
        this.connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector was not created in a stopped state");
        Assertions.assertEquals(Collections.emptyList(), this.connect.connectorInfo(CONNECTOR_NAME).tasks());
        Assertions.assertEquals(Collections.emptyList(), this.connect.taskConfigs(CONNECTOR_NAME));
        this.connect.resumeConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector or tasks did not start running healthily in time");
    }

    @Test
    public void testHealthCheck() throws Exception {
        Response healthCheck;
        Throwable th;
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", BlockingConnectorTest.BlockingConnector.class.getName());
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put("tasks.max", Integer.toString(1));
        hashMap.put(BlockingConnectorTest.Block.BLOCK_CONFIG, "Connector::start");
        this.connect = this.connectBuilder.withCommandLineConnector(hashMap).build();
        EmbeddedConnectStandalone embeddedConnectStandalone = this.connect;
        embeddedConnectStandalone.getClass();
        Thread thread = new Thread(() -> {
            embeddedConnectStandalone.start();
        });
        thread.setName("integration-test-standalone-connect-worker");
        try {
            thread.start();
            AtomicReference atomicReference = new AtomicReference();
            this.connect.requestTimeout(1000L);
            TestUtils.waitForCondition(() -> {
                atomicReference.set(this.connect.healthCheck());
                return true;
            }, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, "Health check endpoint for standalone worker was not available in time");
            this.connect.resetRequestTimeout();
            Response response = (Response) atomicReference.get();
            Throwable th2 = null;
            try {
                try {
                    Assertions.assertEquals(Response.Status.SERVICE_UNAVAILABLE.getStatusCode(), response.getStatus());
                    Assertions.assertNotNull(response.getEntity());
                    String obj = response.getEntity().toString();
                    Assertions.assertTrue(obj.contains("Worker is still starting up"), "Body did not contain expected message: " + obj);
                    if (response != null) {
                        if (0 != 0) {
                            try {
                                response.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            response.close();
                        }
                    }
                    BlockingConnectorTest.Block.reset();
                    healthCheck = this.connect.healthCheck();
                    th = null;
                } catch (Throwable th4) {
                    th2 = th4;
                    throw th4;
                }
                try {
                    try {
                        Assertions.assertEquals(Response.Status.OK.getStatusCode(), healthCheck.getStatus());
                        Assertions.assertNotNull(healthCheck.getEntity());
                        String obj2 = healthCheck.getEntity().toString();
                        Assertions.assertTrue(obj2.contains("Worker has completed startup and is ready to handle requests."), "Body did not contain expected message: " + obj2);
                        if (healthCheck != null) {
                            if (0 != 0) {
                                try {
                                    healthCheck.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                healthCheck.close();
                            }
                        }
                        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector or tasks did not start running healthily in time");
                        hashMap.put(BlockingConnectorTest.Block.BLOCK_CONFIG, "Connector::taskConfigs");
                        this.connect.requestTimeout(1000L);
                        Assertions.assertThrows(ConnectRestException.class, () -> {
                            this.connect.configureConnector(CONNECTOR_NAME, hashMap);
                        });
                        healthCheck = this.connect.healthCheck();
                        Throwable th6 = null;
                        try {
                            try {
                                Assertions.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), healthCheck.getStatus());
                                Assertions.assertNotNull(healthCheck.getEntity());
                                String obj3 = healthCheck.getEntity().toString();
                                Assertions.assertTrue(obj3.contains("Worker was unable to handle this request and may be unable to handle other requests."), "Body did not contain expected message: " + obj3);
                                if (healthCheck != null) {
                                    if (0 != 0) {
                                        try {
                                            healthCheck.close();
                                        } catch (Throwable th7) {
                                            th6.addSuppressed(th7);
                                        }
                                    } else {
                                        healthCheck.close();
                                    }
                                }
                                this.connect.resetRequestTimeout();
                                BlockingConnectorTest.Block.reset();
                                this.connect.deleteConnector(CONNECTOR_NAME);
                                if (thread.isAlive()) {
                                    log.debug("Standalone worker startup not completed yet; interrupting and waiting for startup to finish");
                                    thread.interrupt();
                                    thread.join(TimeUnit.MINUTES.toMillis(1L));
                                    if (thread.isAlive()) {
                                        log.warn("Standalone worker startup never completed; abandoning thread");
                                    }
                                }
                            } catch (Throwable th8) {
                                th6 = th8;
                                throw th8;
                            }
                        } finally {
                        }
                    } catch (Throwable th9) {
                        th = th9;
                        throw th9;
                    }
                } finally {
                }
            } finally {
            }
        } catch (Throwable th10) {
            if (thread.isAlive()) {
                log.debug("Standalone worker startup not completed yet; interrupting and waiting for startup to finish");
                thread.interrupt();
                thread.join(TimeUnit.MINUTES.toMillis(1L));
                if (thread.isAlive()) {
                    log.warn("Standalone worker startup never completed; abandoning thread");
                }
            }
            throw th10;
        }
    }

    private Map<String, String> defaultSourceConnectorProps(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(NUM_TASKS));
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, str);
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("topic.creation.default.replication.factor", String.valueOf(1));
        hashMap.put("topic.creation.default.partitions", String.valueOf(1));
        return hashMap;
    }
}
