package com.couchbase.connect.kafka.handler.sink;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.java.analytics.AnalyticsOptions;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.connect.kafka.config.sink.CouchbaseSinkConfig;
import com.couchbase.connect.kafka.util.config.ConfigHelper;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

@Stability.Volatile
/* loaded from: input_file:com/couchbase/connect/kafka/handler/sink/AnalyticsSinkHandler.class */
public class AnalyticsSinkHandler implements SinkHandler {
    private static final Logger log = LoggerFactory.getLogger(AnalyticsSinkHandler.class);
    protected String bucketName;

    @Override // com.couchbase.connect.kafka.handler.sink.SinkHandler
    public void init(SinkHandlerContext sinkHandlerContext) {
        this.bucketName = ((CouchbaseSinkConfig) ConfigHelper.parse(CouchbaseSinkConfig.class, sinkHandlerContext.configProperties())).bucket();
    }

    @Override // com.couchbase.connect.kafka.handler.sink.SinkHandler
    public SinkAction handle(SinkHandlerParams sinkHandlerParams) {
        String documentId = getDocumentId(sinkHandlerParams);
        SinkDocument orElse = sinkHandlerParams.document().orElse(null);
        String keyspace = keyspace(this.bucketName, sinkHandlerParams.collection().scopeName(), sinkHandlerParams.collection().name());
        if (orElse == null) {
            String deleteStatement = deleteStatement(keyspace, documentId);
            return new SinkAction(Mono.defer(() -> {
                return sinkHandlerParams.cluster().analyticsQuery(deleteStatement).map((v0) -> {
                    return v0.metaData();
                });
            }), ConcurrencyHint.of(documentId));
        }
        try {
            JsonObject fromJson = JsonObject.fromJson(orElse.content());
            if (fromJson.isEmpty()) {
                log.warn("could not generate analytics statement from empty node");
                return SinkAction.ignore();
            }
            String upsertStatement = upsertStatement(keyspace, fromJson);
            return new SinkAction(Mono.defer(() -> {
                return sinkHandlerParams.cluster().analyticsQuery(upsertStatement, AnalyticsOptions.analyticsOptions().parameters(fromJson)).map((v0) -> {
                    return v0.metaData();
                });
            }), ConcurrencyHint.of(documentId));
        } catch (Exception e) {
            log.warn("could not generate analytics statement from node (not json)", e);
            return SinkAction.ignore();
        }
    }

    private String upsertStatement(String str, JsonObject jsonObject) {
        return "UPSERT INTO " + str + " ([" + jsonObject + "]);";
    }

    private String deleteStatement(String str, String str2) {
        return "DELETE FROM " + str + " WHERE _id= \"" + str2 + "\" ;";
    }

    public String toString() {
        return "AnalyticsSinkHandler{, bucketName='" + this.bucketName + "'}";
    }

    protected static String keyspace(String str, String str2, String str3) {
        if (str2.equals("") || str3.equals("")) {
            throw new ConfigException("Missing required configuration for scope and collection.");
        }
        return "`" + str + "`.`" + str2 + "`.`" + str3 + "`";
    }
}
