package org.apache.kafka.connect.integration;

import jakarta.ws.rs.core.Response;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("integration")
/* loaded from: input_file:org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.class */
public class InternalTopicsIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(InternalTopicsIntegrationTest.class);
    private EmbeddedConnectCluster connect;
    Map<String, String> workerProps = new HashMap();
    Properties brokerProps = new Properties();

    @BeforeEach
    public void setup() {
        this.brokerProps.put("auto.create.topics.enable", String.valueOf(false));
    }

    @AfterEach
    public void close() {
        this.connect.stop();
    }

    @Test
    public void testCreateInternalTopicsWithDefaultSettings() throws InterruptedException {
        this.connect = (EmbeddedConnectCluster) ((EmbeddedConnectCluster.Builder) ((EmbeddedConnectCluster.Builder) ((EmbeddedConnectCluster.Builder) new EmbeddedConnectCluster.Builder().name("connect-cluster-1").workerProps(this.workerProps)).numWorkers(1).numBrokers(3)).brokerProps(this.brokerProps)).build();
        this.connect.start();
        log.info("Completed startup of {} Kafka brokers and {} Connect workers", 3, 1);
        log.info("Verifying the internal topics for Connect");
        this.connect.assertions().assertTopicsExist(configTopic(), offsetTopic(), statusTopic());
        assertInternalTopicSettings();
        log.info("Stopping the Connect worker");
        this.connect.removeWorker();
        log.info("Starting the Connect worker");
        this.connect.startConnect();
        log.info("Verifying the internal topics for Connect");
        this.connect.assertions().assertTopicsExist(configTopic(), offsetTopic(), statusTopic());
        assertInternalTopicSettings();
    }

    @Test
    public void testCreateInternalTopicsWithFewerReplicasThanBrokers() throws InterruptedException {
        this.workerProps.put("config.storage.replication.factor", "1");
        this.workerProps.put("offset.storage.replication.factor", "2");
        this.workerProps.put("status.storage.replication.factor", "1");
        this.connect = (EmbeddedConnectCluster) ((EmbeddedConnectCluster.Builder) ((EmbeddedConnectCluster.Builder) ((EmbeddedConnectCluster.Builder) new EmbeddedConnectCluster.Builder().name("connect-cluster-1").workerProps(this.workerProps)).numWorkers(1).numBrokers(2)).brokerProps(this.brokerProps)).build();
        this.connect.start();
        log.info("Verifying the internal topics for Connect");
        this.connect.assertions().assertTopicsExist(configTopic(), offsetTopic(), statusTopic());
        assertInternalTopicSettings();
    }

    @Test
    public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws InterruptedException {
        this.workerProps.put("config.storage.replication.factor", "3");
        this.workerProps.put("offset.storage.replication.factor", "2");
        this.workerProps.put("status.storage.replication.factor", "1");
        this.connect = (EmbeddedConnectCluster) ((EmbeddedConnectCluster.Builder) ((EmbeddedConnectCluster.Builder) ((EmbeddedConnectCluster.Builder) new EmbeddedConnectCluster.Builder().name("connect-cluster-1").workerProps(this.workerProps)).numWorkers(0).numBrokers(1)).brokerProps(this.brokerProps)).build();
        this.connect.start();
        log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", 1);
        WorkerHandle addWorker = this.connect.addWorker();
        this.connect.requestTimeout(1000L);
        Response healthCheck = this.connect.healthCheck(addWorker);
        try {
            Assertions.assertEquals(Response.Status.SERVICE_UNAVAILABLE.getStatusCode(), healthCheck.getStatus());
            Assertions.assertNotNull(healthCheck.getEntity());
            String obj = healthCheck.getEntity().toString();
            Assertions.assertTrue(obj.contains("The worker is currently initializing and reading to the end of internal topics"), "Body did not contain expected message detailing the worker's in-progress operation: " + obj);
            if (healthCheck != null) {
                healthCheck.close();
            }
            this.connect.resetRequestTimeout();
            Future<?> herderTask = addWorker.herderTask();
            Assertions.assertThrows(ExecutionException.class, () -> {
                herderTask.get(1L, TimeUnit.MINUTES);
            });
            log.info("Verifying the internal topics for Connect");
            this.connect.assertions().assertTopicsDoNotExist(configTopic(), offsetTopic());
        } catch (Throwable th) {
            if (healthCheck != null) {
                try {
                    healthCheck.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testFailToStartWhenInternalTopicsAreNotCompacted() throws InterruptedException {
        this.brokerProps.put("log.cleanup.policy", "delete");
        this.workerProps.put("config.storage.topic", "bad-config");
        this.workerProps.put("offset.storage.topic", "bad-offset");
        this.workerProps.put("status.storage.topic", "bad-status");
        this.workerProps.put("config.storage.replication.factor", "1");
        this.workerProps.put("offset.storage.replication.factor", "1");
        this.workerProps.put("status.storage.replication.factor", "1");
        this.connect = (EmbeddedConnectCluster) ((EmbeddedConnectCluster.Builder) ((EmbeddedConnectCluster.Builder) ((EmbeddedConnectCluster.Builder) new EmbeddedConnectCluster.Builder().name("connect-cluster-1").workerProps(this.workerProps)).numWorkers(0).numBrokers(1)).brokerProps(this.brokerProps)).build();
        log.info("Starting {} Kafka brokers, but no Connect workers yet", 1);
        this.connect.start();
        log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", 1);
        this.connect.kafka().createTopic("good-config", 1, 1, compactCleanupPolicy());
        this.connect.kafka().createTopic("good-offset", 1, 1, compactCleanupPolicy());
        this.connect.kafka().createTopic("good-status", 1, 1, compactCleanupPolicy());
        this.connect.kafka().createTopic("bad-config", 1, 1, deleteCleanupPolicy());
        this.connect.kafka().createTopic("bad-offset", 1, 1, compactAndDeleteCleanupPolicy());
        this.connect.kafka().createTopic("bad-status", 1, 1, noTopicSettings());
        log.info("Verifying the internal topics for Connect were manually created");
        this.connect.assertions().assertTopicsExist("good-config", "good-offset", "good-status", "bad-config", "bad-offset", "bad-status");
        WorkerHandle addWorker = this.connect.addWorker();
        Assertions.assertFalse(this.connect.isHealthy(addWorker));
        Assertions.assertFalse(this.connect.allWorkersHealthy());
        Assertions.assertFalse(this.connect.anyWorkersHealthy());
        this.connect.removeWorker(addWorker);
        this.workerProps.put("config.storage.topic", "good-config");
        WorkerHandle addWorker2 = this.connect.addWorker();
        Assertions.assertFalse(this.connect.isHealthy(addWorker2));
        Assertions.assertFalse(this.connect.allWorkersHealthy());
        Assertions.assertFalse(this.connect.anyWorkersHealthy());
        this.connect.removeWorker(addWorker2);
        this.workerProps.put("offset.storage.topic", "good-offset");
        WorkerHandle addWorker3 = this.connect.addWorker();
        Assertions.assertFalse(this.connect.isHealthy(addWorker3));
        Assertions.assertFalse(this.connect.allWorkersHealthy());
        Assertions.assertFalse(this.connect.anyWorkersHealthy());
        this.connect.removeWorker(addWorker3);
        this.workerProps.put("status.storage.topic", "good-status");
        this.connect.addWorker();
        this.connect.assertions().assertAtLeastNumWorkersAreUp(1, "Worker did not start in time.");
    }

    @Test
    public void testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy() throws InterruptedException {
        this.brokerProps.put("log.cleanup.policy", "compact");
        this.workerProps.put("config.storage.topic", "config-topic");
        this.workerProps.put("offset.storage.topic", "offset-topic");
        this.workerProps.put("status.storage.topic", "status-topic");
        this.workerProps.put("config.storage.replication.factor", "1");
        this.workerProps.put("offset.storage.replication.factor", "1");
        this.workerProps.put("status.storage.replication.factor", "1");
        this.connect = (EmbeddedConnectCluster) ((EmbeddedConnectCluster.Builder) ((EmbeddedConnectCluster.Builder) ((EmbeddedConnectCluster.Builder) new EmbeddedConnectCluster.Builder().name("connect-cluster-1").workerProps(this.workerProps)).numWorkers(0).numBrokers(1)).brokerProps(this.brokerProps)).build();
        log.info("Starting {} Kafka brokers, but no Connect workers yet", 1);
        this.connect.start();
        log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", 1);
        this.connect.kafka().createTopic("config-topic", 1, 1, noTopicSettings());
        this.connect.kafka().createTopic("offset-topic", 1, 1, noTopicSettings());
        this.connect.kafka().createTopic("status-topic", 1, 1, noTopicSettings());
        log.info("Verifying the internal topics for Connect were manually created");
        this.connect.assertions().assertTopicsExist("config-topic", "offset-topic", "status-topic");
        this.connect.addWorker();
        this.connect.assertions().assertAtLeastNumWorkersAreUp(1, "Worker did not start in time.");
    }

    protected Map<String, String> compactCleanupPolicy() {
        return Collections.singletonMap("cleanup.policy", "compact");
    }

    protected Map<String, String> deleteCleanupPolicy() {
        return Collections.singletonMap("cleanup.policy", "delete");
    }

    protected Map<String, String> noTopicSettings() {
        return Collections.emptyMap();
    }

    protected Map<String, String> compactAndDeleteCleanupPolicy() {
        HashMap hashMap = new HashMap();
        hashMap.put("cleanup.policy", "delete,compact");
        return hashMap;
    }

    protected void assertInternalTopicSettings() throws InterruptedException {
        DistributedConfig distributedConfig = new DistributedConfig(this.workerProps);
        this.connect.assertions().assertTopicSettings(configTopic(), distributedConfig.getShort("config.storage.replication.factor").shortValue(), 1, "Config topic does not have the expected settings");
        this.connect.assertions().assertTopicSettings(statusTopic(), distributedConfig.getShort("status.storage.replication.factor").shortValue(), distributedConfig.getInt("status.storage.partitions").intValue(), "Status topic does not have the expected settings");
        this.connect.assertions().assertTopicSettings(offsetTopic(), distributedConfig.getShort("offset.storage.replication.factor").shortValue(), distributedConfig.getInt("offset.storage.partitions").intValue(), "Offset topic does not have the expected settings");
    }

    protected String configTopic() {
        return this.workerProps.get("config.storage.topic");
    }

    protected String offsetTopic() {
        return this.workerProps.get("offset.storage.topic");
    }

    protected String statusTopic() {
        return this.workerProps.get("status.storage.topic");
    }
}
