package com.couchbase.connect.kafka.handler.sink;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.java.analytics.AnalyticsOptions;
import com.couchbase.client.java.json.JsonArray;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.connect.kafka.config.sink.CouchbaseSinkConfig;
import com.couchbase.connect.kafka.util.config.ConfigHelper;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

@Stability.Volatile
/* loaded from: input_file:com/couchbase/connect/kafka/handler/sink/AnalyticsSinkHandler.class */
public class AnalyticsSinkHandler implements SinkHandler {
    private static final Logger log = LoggerFactory.getLogger(AnalyticsSinkHandler.class);
    protected String bucketName;

    @Override // com.couchbase.connect.kafka.handler.sink.SinkHandler
    public void init(SinkHandlerContext sinkHandlerContext) {
        this.bucketName = ((CouchbaseSinkConfig) ConfigHelper.parse(CouchbaseSinkConfig.class, sinkHandlerContext.configProperties())).bucket();
    }

    private static Pair<String, JsonArray> prepareWhereClauseForDelete(JsonObject jsonObject) {
        ArrayList arrayList = new ArrayList();
        return Pair.of((String) jsonObject.getNames().stream().map(str -> {
            arrayList.add(jsonObject.get(str));
            return "`" + str + "`=?";
        }).collect(Collectors.joining(" AND ")), JsonArray.from(arrayList));
    }

    protected static Pair<String, JsonArray> deleteQuery(String str, JsonObject jsonObject) {
        Pair<String, JsonArray> prepareWhereClauseForDelete = prepareWhereClauseForDelete(jsonObject);
        return Pair.of("DELETE FROM " + str + " WHERE " + ((String) prepareWhereClauseForDelete.getLeft()) + ";", (JsonArray) prepareWhereClauseForDelete.getRight());
    }

    protected static JsonObject getJsonObject(String str) {
        JsonObject jsonObject = null;
        try {
            jsonObject = JsonObject.fromJson(str);
        } catch (Exception e) {
            log.warn("could not generate analytics statement from node (not json)", e);
        }
        if (jsonObject != null && jsonObject.isEmpty()) {
            jsonObject = null;
            log.warn("could not generate analytics statement from empty node");
        }
        return jsonObject;
    }

    private String upsertStatement(String str, JsonObject jsonObject) {
        return "UPSERT INTO " + str + " ([" + jsonObject + "]);";
    }

    @Override // com.couchbase.connect.kafka.handler.sink.SinkHandler
    public SinkAction handle(SinkHandlerParams sinkHandlerParams) {
        String documentId = getDocumentId(sinkHandlerParams);
        SinkDocument orElse = sinkHandlerParams.document().orElse(null);
        String keyspace = keyspace(this.bucketName, sinkHandlerParams.getScopeAndCollection().getScope(), sinkHandlerParams.getScopeAndCollection().getCollection());
        if (orElse == null) {
            if (documentId.contains("`")) {
                log.warn("Could not generate Analytics N1QL DELETE statement with backtick (`) in field name");
                return SinkAction.ignore();
            }
            JsonObject jsonObject = getJsonObject(documentId);
            if (jsonObject == null) {
                return SinkAction.ignore();
            }
            Pair<String, JsonArray> deleteQuery = deleteQuery(keyspace, jsonObject);
            return new SinkAction(Mono.defer(() -> {
                return sinkHandlerParams.cluster().analyticsQuery((String) deleteQuery.getLeft(), AnalyticsOptions.analyticsOptions().parameters((JsonArray) deleteQuery.getRight())).map((v0) -> {
                    return v0.metaData();
                });
            }), ConcurrencyHint.of(documentId));
        }
        String str = new String(orElse.content(), StandardCharsets.UTF_8);
        if (str.contains("`")) {
            log.warn("Could not generate Analytics N1QL UPSERT statement with backtick (`) in document content");
            return SinkAction.ignore();
        }
        JsonObject jsonObject2 = getJsonObject(str);
        if (jsonObject2 == null) {
            return SinkAction.ignore();
        }
        String upsertStatement = upsertStatement(keyspace, jsonObject2);
        return new SinkAction(Mono.defer(() -> {
            return sinkHandlerParams.cluster().analyticsQuery(upsertStatement, AnalyticsOptions.analyticsOptions().parameters(jsonObject2)).map((v0) -> {
                return v0.metaData();
            });
        }), ConcurrencyHint.of(documentId));
    }

    public String toString() {
        return "AnalyticsSinkHandler{, bucketName='" + this.bucketName + "'}";
    }

    protected static String keyspace(String str, String str2, String str3) {
        if (str2.equals("") || str3.equals("")) {
            throw new ConfigException("Missing required configuration for scope and collection.");
        }
        String str4 = "";
        if (str != null && !str.isEmpty()) {
            str4 = str4 + "`" + str + "`.";
        }
        return str4 + "`" + str2 + "`.`" + str3 + "`";
    }
}
