package org.apache.kafka.connect.util.clusters;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.class */
public class EmbeddedConnectCluster {
    private static final int DEFAULT_NUM_BROKERS = 1;
    private static final int DEFAULT_NUM_WORKERS = 1;
    private static final String REST_HOST_NAME = "localhost";
    private static final String DEFAULT_WORKER_NAME_PREFIX = "connect-worker-";
    private final Set<WorkerHandle> connectCluster;
    private final EmbeddedKafkaCluster kafkaCluster;
    private final Map<String, String> workerProps;
    private final String connectClusterName;
    private final int numBrokers;
    private final int numInitialWorkers;
    private final boolean maskExitProcedures;
    private final String workerNamePrefix;
    private final AtomicInteger nextWorkerId;
    public Exit.Procedure exitProcedure;
    public Exit.Procedure haltProcedure;
    private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectCluster.class);
    private static final Properties DEFAULT_BROKER_CONFIG = new Properties();

    /* loaded from: input_file:org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster$Builder.class */
    public static class Builder {
        private String name = UUID.randomUUID().toString();
        private Map<String, String> workerProps = new HashMap();
        private int numWorkers = 1;
        private int numBrokers = 1;
        private Properties brokerProps = EmbeddedConnectCluster.DEFAULT_BROKER_CONFIG;
        private boolean maskExitProcedures = false;

        public Builder name(String str) {
            this.name = str;
            return this;
        }

        public Builder workerProps(Map<String, String> map) {
            this.workerProps = map;
            return this;
        }

        public Builder numWorkers(int i) {
            this.numWorkers = i;
            return this;
        }

        public Builder numBrokers(int i) {
            this.numBrokers = i;
            return this;
        }

        public Builder brokerProps(Properties properties) {
            this.brokerProps = properties;
            return this;
        }

        public Builder maskExitProcedures(boolean z) {
            this.maskExitProcedures = z;
            return this;
        }

        public EmbeddedConnectCluster build() {
            return new EmbeddedConnectCluster(this.name, this.workerProps, this.numWorkers, this.numBrokers, this.brokerProps, this.maskExitProcedures);
        }
    }

    private EmbeddedConnectCluster(String str, Map<String, String> map, int i, int i2, Properties properties, boolean z) {
        this.nextWorkerId = new AtomicInteger(0);
        this.exitProcedure = (i3, str2) -> {
            if (i3 == 0) {
                Exit.exit(0, str2);
            } else {
                String str2 = "Abrupt service exit with code " + i3 + " and message " + str2;
                log.warn(str2);
                throw new UngracefulShutdownException(str2);
            }
        };
        this.haltProcedure = (i4, str3) -> {
            if (i4 == 0) {
                Exit.halt(0, str3);
            } else {
                String str3 = "Abrupt service halt with code " + i4 + " and message " + str3;
                log.warn(str3);
                throw new UngracefulShutdownException(str3);
            }
        };
        this.workerProps = map;
        this.connectClusterName = str;
        this.numBrokers = i2;
        this.kafkaCluster = new EmbeddedKafkaCluster(i2, properties);
        this.connectCluster = new LinkedHashSet();
        this.numInitialWorkers = i;
        this.maskExitProcedures = z;
        this.workerNamePrefix = DEFAULT_WORKER_NAME_PREFIX;
    }

    public void start() throws IOException {
        if (this.maskExitProcedures) {
            Exit.setExitProcedure(this.exitProcedure);
            Exit.setHaltProcedure(this.haltProcedure);
        }
        this.kafkaCluster.before();
        startConnect();
    }

    public void stop() {
        this.connectCluster.forEach(this::stopWorker);
        try {
            try {
                this.kafkaCluster.after();
                Exit.resetExitProcedure();
                Exit.resetHaltProcedure();
            } catch (Exception e) {
                log.error("Could not stop kafka", e);
                throw new RuntimeException("Could not stop brokers", e);
            } catch (UngracefulShutdownException e2) {
                log.warn("Kafka did not shutdown gracefully");
                Exit.resetExitProcedure();
                Exit.resetHaltProcedure();
            }
        } catch (Throwable th) {
            Exit.resetExitProcedure();
            Exit.resetHaltProcedure();
            throw th;
        }
    }

    public WorkerHandle addWorker() {
        WorkerHandle start = WorkerHandle.start(this.workerNamePrefix + this.nextWorkerId.getAndIncrement(), this.workerProps);
        this.connectCluster.add(start);
        log.info("Started worker {}", start);
        return start;
    }

    public void removeWorker() {
        WorkerHandle workerHandle = null;
        Iterator<WorkerHandle> it = this.connectCluster.iterator();
        while (it.hasNext()) {
            workerHandle = it.next();
        }
        removeWorker(workerHandle);
    }

    public void removeWorker(WorkerHandle workerHandle) {
        if (this.connectCluster.isEmpty()) {
            throw new IllegalStateException("Cannot remove worker. Cluster is empty");
        }
        stopWorker(workerHandle);
        this.connectCluster.remove(workerHandle);
    }

    private void stopWorker(WorkerHandle workerHandle) {
        try {
            log.info("Stopping worker {}", workerHandle);
            workerHandle.stop();
        } catch (Exception e) {
            log.error("Could not stop connect", e);
            throw new RuntimeException("Could not stop worker", e);
        } catch (UngracefulShutdownException e2) {
            log.warn("Worker {} did not shutdown gracefully", workerHandle);
        }
    }

    public void startConnect() {
        log.info("Starting Connect cluster '{}' with {} workers", this.connectClusterName, Integer.valueOf(this.numInitialWorkers));
        this.workerProps.put("bootstrap.servers", kafka().bootstrapServers());
        this.workerProps.put("rest.host.name", REST_HOST_NAME);
        this.workerProps.put("rest.port", "0");
        String valueOf = String.valueOf(this.numBrokers);
        putIfAbsent(this.workerProps, "group.id", "connect-integration-test-" + this.connectClusterName);
        putIfAbsent(this.workerProps, "offset.storage.topic", "connect-offset-topic-" + this.connectClusterName);
        putIfAbsent(this.workerProps, "offset.storage.replication.factor", valueOf);
        putIfAbsent(this.workerProps, "config.storage.topic", "connect-config-topic-" + this.connectClusterName);
        putIfAbsent(this.workerProps, "config.storage.replication.factor", valueOf);
        putIfAbsent(this.workerProps, "status.storage.topic", "connect-storage-topic-" + this.connectClusterName);
        putIfAbsent(this.workerProps, "status.storage.replication.factor", valueOf);
        putIfAbsent(this.workerProps, "key.converter", "org.apache.kafka.connect.storage.StringConverter");
        putIfAbsent(this.workerProps, "value.converter", "org.apache.kafka.connect.storage.StringConverter");
        for (int i = 0; i < this.numInitialWorkers; i++) {
            addWorker();
        }
    }

    public Set<WorkerHandle> activeWorkers() {
        ObjectMapper objectMapper = new ObjectMapper();
        return (Set) this.connectCluster.stream().filter(workerHandle -> {
            try {
                objectMapper.readerFor(ServerInfo.class).readValue(executeGet(workerHandle.url().toString()));
                return true;
            } catch (IOException e) {
                return false;
            }
        }).collect(Collectors.toSet());
    }

    public Set<WorkerHandle> workers() {
        return new LinkedHashSet(this.connectCluster);
    }

    public void configureConnector(String str, Map<String, String> map) throws IOException {
        String endpointForResource = endpointForResource(String.format("connectors/%s/config", str));
        try {
            int executePut = executePut(endpointForResource, new ObjectMapper().writeValueAsString(map));
            if (executePut >= 400) {
                throw new ConnectRestException(executePut, "Could not execute PUT request");
            }
        } catch (IOException e) {
            log.error("Could not execute PUT request to " + endpointForResource, e);
            throw e;
        }
    }

    public void deleteConnector(String str) throws IOException {
        int executeDelete = executeDelete(endpointForResource(String.format("connectors/%s", str)));
        if (executeDelete >= 400) {
            throw new ConnectRestException(executeDelete, "Could not execute DELETE request.");
        }
    }

    public Collection<String> connectors() {
        try {
            return (Collection) new ObjectMapper().readerFor(Collection.class).readValue(executeGet(endpointForResource("connectors")));
        } catch (IOException e) {
            log.error("Could not read connector list", e);
            throw new ConnectException("Could not read connector list", e);
        }
    }

    public ConnectorStateInfo connectorStatus(String str) {
        try {
            return (ConnectorStateInfo) new ObjectMapper().readerFor(ConnectorStateInfo.class).readValue(executeGet(endpointForResource(String.format("connectors/%s/status", str))));
        } catch (IOException e) {
            log.error("Could not read connector state", e);
            throw new ConnectException("Could not read connector state", e);
        }
    }

    public String endpointForResource(String str) throws IOException {
        return ((URI) this.connectCluster.stream().map((v0) -> {
            return v0.url();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).findFirst().orElseThrow(() -> {
            return new IOException("Connect workers have not been provisioned");
        })).toString() + str;
    }

    private static void putIfAbsent(Map<String, String> map, String str, String str2) {
        if (map.containsKey(str)) {
            return;
        }
        map.put(str, str2);
    }

    public EmbeddedKafkaCluster kafka() {
        return this.kafkaCluster;
    }

    public int executePut(String str, String str2) throws IOException {
        log.debug("Executing PUT request to URL={}. Payload={}", str, str2);
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(str).openConnection();
        httpURLConnection.setDoOutput(true);
        httpURLConnection.setRequestProperty("Content-Type", "application/json");
        httpURLConnection.setRequestMethod("PUT");
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(httpURLConnection.getOutputStream());
        Throwable th = null;
        try {
            try {
                outputStreamWriter.write(str2);
                if (outputStreamWriter != null) {
                    if (0 != 0) {
                        try {
                            outputStreamWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        outputStreamWriter.close();
                    }
                }
                if (httpURLConnection.getResponseCode() < 400) {
                    InputStream inputStream = httpURLConnection.getInputStream();
                    Throwable th3 = null;
                    try {
                        try {
                            log.info("PUT response for URL={} is {}", str, responseToString(inputStream));
                            if (inputStream != null) {
                                if (0 != 0) {
                                    try {
                                        inputStream.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                } else {
                                    inputStream.close();
                                }
                            }
                        } finally {
                        }
                    } catch (Throwable th5) {
                        if (inputStream != null) {
                            if (th3 != null) {
                                try {
                                    inputStream.close();
                                } catch (Throwable th6) {
                                    th3.addSuppressed(th6);
                                }
                            } else {
                                inputStream.close();
                            }
                        }
                        throw th5;
                    }
                } else {
                    InputStream errorStream = httpURLConnection.getErrorStream();
                    Throwable th7 = null;
                    try {
                        log.info("PUT error response for URL={} is {}", str, responseToString(errorStream));
                        if (errorStream != null) {
                            if (0 != 0) {
                                try {
                                    errorStream.close();
                                } catch (Throwable th8) {
                                    th7.addSuppressed(th8);
                                }
                            } else {
                                errorStream.close();
                            }
                        }
                    } catch (Throwable th9) {
                        if (errorStream != null) {
                            if (0 != 0) {
                                try {
                                    errorStream.close();
                                } catch (Throwable th10) {
                                    th7.addSuppressed(th10);
                                }
                            } else {
                                errorStream.close();
                            }
                        }
                        throw th9;
                    }
                }
                return httpURLConnection.getResponseCode();
            } finally {
            }
        } catch (Throwable th11) {
            if (outputStreamWriter != null) {
                if (th != null) {
                    try {
                        outputStreamWriter.close();
                    } catch (Throwable th12) {
                        th.addSuppressed(th12);
                    }
                } else {
                    outputStreamWriter.close();
                }
            }
            throw th11;
        }
    }

    public String executeGet(String str) throws IOException {
        log.debug("Executing GET request to URL={}.", str);
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(str).openConnection();
        httpURLConnection.setDoOutput(true);
        httpURLConnection.setRequestMethod("GET");
        try {
            InputStream inputStream = httpURLConnection.getInputStream();
            Throwable th = null;
            try {
                try {
                    StringBuilder sb = new StringBuilder();
                    while (true) {
                        int read = inputStream.read();
                        if (read == -1) {
                            break;
                        }
                        sb.append((char) read);
                    }
                    log.debug("GET response for URL={} is {}", str, sb);
                    String sb2 = sb.toString();
                    if (inputStream != null) {
                        if (0 != 0) {
                            try {
                                inputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            inputStream.close();
                        }
                    }
                    return sb2;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            Response.Status fromStatusCode = Response.Status.fromStatusCode(httpURLConnection.getResponseCode());
            if (fromStatusCode != null) {
                throw new ConnectRestException(fromStatusCode, "Invalid endpoint: " + str, e);
            }
            throw e;
        }
    }

    public int executeDelete(String str) throws IOException {
        log.debug("Executing DELETE request to URL={}", str);
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(str).openConnection();
        httpURLConnection.setDoOutput(true);
        httpURLConnection.setRequestMethod("DELETE");
        httpURLConnection.connect();
        return httpURLConnection.getResponseCode();
    }

    private String responseToString(InputStream inputStream) throws IOException {
        StringBuilder sb = new StringBuilder();
        while (true) {
            int read = inputStream.read();
            if (read == -1) {
                return sb.toString();
            }
            sb.append((char) read);
        }
    }
}
