package io.floodplain.kotlindsl;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.floodplain.streams.api.Topic;
import io.floodplain.streams.api.TopologyContext;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.ws.rs.core.MediaType;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.Intrinsics;
import mu.KLogger;
import mu.KotlinLogging;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkTask;
import org.jetbrains.annotations.NotNull;

/* compiled from: FloodplainConnector.kt */
@Metadata(mv = {1, 4, 0}, bv = {1, 0, 3}, k = 2, d1 = {"��r\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010$\n\u0002\u0010��\n��\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010 \n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010!\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0010\u000b\n��\u001a*\u0010\b\u001a\u00020\t2\u0006\u0010\n\u001a\u00020\u000b2\u0006\u0010\f\u001a\u00020\t2\u0012\u0010\r\u001a\u000e\u0012\u0004\u0012\u00020\t\u0012\u0004\u0012\u00020\u000f0\u000e\u001a\u0018\u0010\u0010\u001a\u00020\u00112\u0006\u0010\u0012\u001a\u00020\t2\u0006\u0010\u0013\u001a\u00020\u0014H\u0002\u001a\u0016\u0010\u0015\u001a\b\u0012\u0004\u0012\u00020\t0\u00162\u0006\u0010\u0017\u001a\u00020\u0014H\u0002\u001a\u0016\u0010\u0018\u001a\u00020\u00192\u0006\u0010\u001a\u001a\u00020\u001b2\u0006\u0010\u001c\u001a\u00020\u001d\u001a6\u0010\u001e\u001a\u0014\u0012\u0004\u0012\u00020\u001f\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00190 0\u000e2\u0006\u0010\n\u001a\u00020\u000b2\u0006\u0010\u001c\u001a\u00020\u001d2\f\u0010!\u001a\b\u0012\u0004\u0012\u00020#0\"\u001a\u0018\u0010$\u001a\u00020\u00112\u0006\u0010\u0017\u001a\u00020\u00142\u0006\u0010%\u001a\u00020\tH\u0002\u001a.\u0010&\u001a\u00020\u00112\u0006\u0010\f\u001a\u00020\t2\u0006\u0010\n\u001a\u00020\u000b2\u0006\u0010\u0013\u001a\u00020\u00142\u0006\u0010%\u001a\u00020\t2\u0006\u0010'\u001a\u00020(\"\u0011\u0010��\u001a\u00020\u0001¢\u0006\b\n��\u001a\u0004\b\u0002\u0010\u0003\"\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��\"\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��¨\u0006)"}, d2 = {"httpClient", "Ljava/net/http/HttpClient;", "getHttpClient", "()Ljava/net/http/HttpClient;", "logger", "Lmu/KLogger;", "objectMapper", "Lcom/fasterxml/jackson/databind/ObjectMapper;", "constructConnectorJson", "", "topologyContext", "Lio/floodplain/streams/api/TopologyContext;", "connectorName", "parameters", "", "", "deleteConnector", "", "name", "connectURL", "Ljava/net/URL;", "existingConnectors", "", ConnectProtocol.URL_KEY_NAME, "floodplainSinkFromTask", "Lio/floodplain/kotlindsl/FloodplainSink;", "task", "Lorg/apache/kafka/connect/sink/SinkTask;", "config", "Lio/floodplain/kotlindsl/SinkConfig;", "instantiateSinkConfig", "Lio/floodplain/streams/api/Topic;", "", "connector", "Lkotlin/Function0;", "Lorg/apache/kafka/connect/sink/SinkConnector;", "postToHttpJava11", "jsonString", "startConstructor", "force", "", "floodplain-dsl"})
/* loaded from: input_file:io/floodplain/kotlindsl/FloodplainConnectorKt.class */
public final class FloodplainConnectorKt {
    private static final KLogger logger = KotlinLogging.INSTANCE.logger(new Function0<Unit>() { // from class: io.floodplain.kotlindsl.FloodplainConnectorKt$logger$1
        @Override // kotlin.jvm.functions.Function0
        public /* bridge */ /* synthetic */ Unit invoke() {
            invoke2();
            return Unit.INSTANCE;
        }

        /* renamed from: invoke, reason: avoid collision after fix types in other method */
        public final void invoke2() {
        }
    });
    private static final ObjectMapper objectMapper = new ObjectMapper();

    @NotNull
    private static final HttpClient httpClient;

    @NotNull
    public static final HttpClient getHttpClient() {
        return httpClient;
    }

    @NotNull
    public static final String constructConnectorJson(@NotNull TopologyContext topologyContext, @NotNull String connectorName, @NotNull Map<String, ? extends Object> parameters) {
        Intrinsics.checkNotNullParameter(topologyContext, "topologyContext");
        Intrinsics.checkNotNullParameter(connectorName, "connectorName");
        Intrinsics.checkNotNullParameter(parameters, "parameters");
        String str = topologyContext.topicName(connectorName);
        ObjectNode createObjectNode = objectMapper.createObjectNode();
        createObjectNode.put("name", str);
        ObjectNode createObjectNode2 = objectMapper.createObjectNode();
        createObjectNode.set("config", createObjectNode2);
        for (Map.Entry<String, ? extends Object> entry : parameters.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            if (value instanceof String) {
                createObjectNode2.put(key, (String) value);
            } else if (value instanceof Integer) {
                createObjectNode2.put(key, (Integer) value);
            } else if (value instanceof Long) {
                createObjectNode2.put(key, (Long) value);
            } else if (value instanceof Float) {
                createObjectNode2.put(key, (Float) value);
            } else if (value instanceof Double) {
                createObjectNode2.put(key, (Double) value);
            } else if (value instanceof Boolean) {
                createObjectNode2.put(key, (Boolean) value);
            }
        }
        createObjectNode2.put("name", str);
        createObjectNode2.put("database.server.name", str);
        String writeValueAsString = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(createObjectNode);
        Intrinsics.checkNotNullExpressionValue(writeValueAsString, "objectMapper.writerWithD….writeValueAsString(node)");
        return writeValueAsString;
    }

    public static final void startConstructor(@NotNull String connectorName, @NotNull TopologyContext topologyContext, @NotNull URL connectURL, @NotNull String jsonString, boolean z) {
        Intrinsics.checkNotNullParameter(connectorName, "connectorName");
        Intrinsics.checkNotNullParameter(topologyContext, "topologyContext");
        Intrinsics.checkNotNullParameter(connectURL, "connectURL");
        Intrinsics.checkNotNullParameter(jsonString, "jsonString");
        String generatedName = topologyContext.topicName(connectorName);
        if (existingConnectors(connectURL).contains(generatedName)) {
            if (z) {
                logger.warn("Force enabled, deleting old");
                Intrinsics.checkNotNullExpressionValue(generatedName, "generatedName");
                deleteConnector(generatedName, connectURL);
            } else {
                logger.warn("Connector: {} already present, ignoring", generatedName);
            }
        }
        postToHttpJava11(connectURL, jsonString);
    }

    private static final List<String> existingConnectors(URL url) {
        JsonNode readTree = objectMapper.readTree((InputStream) httpClient.send(HttpRequest.newBuilder().uri(url.toURI()).build(), HttpResponse.BodyHandlers.ofInputStream()).body());
        if (readTree == null) {
            throw new NullPointerException("null cannot be cast to non-null type com.fasterxml.jackson.databind.node.ArrayNode");
        }
        ArrayNode arrayNode = (ArrayNode) readTree;
        final ArrayList arrayList = new ArrayList();
        arrayNode.forEach(new Consumer<JsonNode>() { // from class: io.floodplain.kotlindsl.FloodplainConnectorKt$existingConnectors$1
            @Override // java.util.function.Consumer
            public final void accept(@NotNull JsonNode j) {
                Intrinsics.checkNotNullParameter(j, "j");
                List list = arrayList;
                String asText = j.asText();
                Intrinsics.checkNotNullExpressionValue(asText, "j.asText()");
                list.add(asText);
            }
        });
        List<String> unmodifiableList = Collections.unmodifiableList(arrayList);
        Intrinsics.checkNotNullExpressionValue(unmodifiableList, "Collections.unmodifiableList(result)");
        return unmodifiableList;
    }

    private static final void deleteConnector(String str, URL url) throws IOException {
        HttpRequest build = HttpRequest.newBuilder().uri(new URL(url + '/' + str).toURI()).DELETE().build();
        Intrinsics.checkNotNullExpressionValue(build, "HttpRequest.newBuilder()…DELETE()\n        .build()");
        HttpResponse send = httpClient.send(build, HttpResponse.BodyHandlers.ofString());
        if (send.statusCode() >= 400) {
            throw new IOException("Error deleting connector: " + send.uri());
        }
    }

    private static final void postToHttpJava11(URL url, String str) {
        HttpRequest build = HttpRequest.newBuilder().uri(url.toURI()).header("Content-Type", MediaType.APPLICATION_JSON).header("Accept", MediaType.APPLICATION_JSON).POST(HttpRequest.BodyPublishers.ofString(str)).build();
        Intrinsics.checkNotNullExpressionValue(build, "HttpRequest.newBuilder()…String))\n        .build()");
        HttpResponse send = httpClient.send(build, HttpResponse.BodyHandlers.ofString());
        Intrinsics.checkNotNullExpressionValue(send, "httpClient.send(request, BodyHandlers.ofString())");
        if (send.statusCode() >= 400) {
            logger.error("Scheduling connector failed. Request: " + str);
            throw new IOException("Error calling connector: " + send.uri() + " code: " + send.statusCode() + " body: " + ((String) send.body()));
        }
    }

    @NotNull
    public static final FloodplainSink floodplainSinkFromTask(@NotNull SinkTask task, @NotNull SinkConfig config) {
        Intrinsics.checkNotNullParameter(task, "task");
        Intrinsics.checkNotNullParameter(config, "config");
        return new LocalConnectorSink(task, config);
    }

    @NotNull
    public static final Map<Topic, List<FloodplainSink>> instantiateSinkConfig(@NotNull TopologyContext topologyContext, @NotNull SinkConfig config, @NotNull Function0<? extends SinkConnector> connector) {
        Intrinsics.checkNotNullParameter(topologyContext, "topologyContext");
        Intrinsics.checkNotNullParameter(config, "config");
        Intrinsics.checkNotNullParameter(connector, "connector");
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        List<MaterializedConfig> materializeConnectorConfig = config.materializeConnectorConfig(topologyContext);
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(materializeConnectorConfig, 10));
        for (MaterializedConfig materializedConfig : materializeConnectorConfig) {
            SinkConnector invoke = connector.invoke();
            connector.invoke().start(materializedConfig.getSettings());
            Task newInstance = invoke.taskClass().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            if (newInstance == null) {
                throw new NullPointerException("null cannot be cast to non-null type org.apache.kafka.connect.sink.SinkTask");
            }
            SinkTask sinkTask = (SinkTask) newInstance;
            sinkTask.start(materializedConfig.getSettings());
            FloodplainSink floodplainSinkFromTask = floodplainSinkFromTask(sinkTask, config);
            Iterator<T> it = materializedConfig.getTopics().iterator();
            while (it.hasNext()) {
                Object computeIfAbsent = linkedHashMap.computeIfAbsent((Topic) it.next(), new Function<Topic, List<FloodplainSink>>() { // from class: io.floodplain.kotlindsl.FloodplainConnectorKt$instantiateSinkConfig$1$1$list$1
                    @Override // java.util.function.Function
                    @NotNull
                    public final List<FloodplainSink> apply(@NotNull Topic it2) {
                        Intrinsics.checkNotNullParameter(it2, "it");
                        return new ArrayList();
                    }
                });
                Intrinsics.checkNotNullExpressionValue(computeIfAbsent, "result.computeIfAbsent(topic) { mutableListOf() }");
                ((List) computeIfAbsent).add(floodplainSinkFromTask);
            }
            arrayList.add(Unit.INSTANCE);
        }
        return linkedHashMap;
    }

    static {
        HttpClient build = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).followRedirects(HttpClient.Redirect.NORMAL).connectTimeout(Duration.ofSeconds(10L)).build();
        Intrinsics.checkNotNullExpressionValue(build, "HttpClient.newBuilder()\n…Seconds(10))\n    .build()");
        httpClient = build;
    }
}
