package com.couchbase.connect.kafka.sink;

import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import com.couchbase.client.java.AsyncBucket;
import com.couchbase.client.java.PersistTo;
import com.couchbase.client.java.ReplicateTo;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.query.AsyncN1qlQueryResult;
import com.couchbase.client.java.query.N1qlQuery;
import com.couchbase.client.java.query.ParameterizedN1qlQuery;
import com.couchbase.connect.kafka.util.JsonBinaryDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
import rx.Notification;
import rx.functions.Action1;

/* loaded from: input_file:com/couchbase/connect/kafka/sink/N1qlWriter.class */
public class N1qlWriter {
    private static final Logger LOGGER = LoggerFactory.getLogger(N1qlWriter.class);
    private N1qlMode mode;
    private String idField = "__id__";
    private boolean createDocuments;

    public N1qlWriter(N1qlMode n1qlMode, boolean z) {
        this.mode = n1qlMode;
        this.createDocuments = z;
    }

    public Completable write(final AsyncBucket asyncBucket, final JsonBinaryDocument jsonBinaryDocument, PersistTo persistTo, ReplicateTo replicateTo) {
        if (jsonBinaryDocument == null || jsonBinaryDocument.content() == null) {
            LOGGER.warn("document or document content is null");
            return Completable.complete();
        }
        final JsonObject fromJson = JsonObject.fromJson(jsonBinaryDocument.content().toString(CharsetUtil.UTF_8));
        ParameterizedN1qlQuery parameterizedN1qlQuery = null;
        if (this.mode == N1qlMode.UPDATE) {
            String parseUpdate = parseUpdate(asyncBucket.name(), fromJson);
            if (parseUpdate == null || parseUpdate.isEmpty()) {
                LOGGER.warn("could not generate statement from node " + RedactableArgument.user(fromJson));
                return Completable.complete();
            }
            fromJson.put(this.idField, jsonBinaryDocument.id());
            parameterizedN1qlQuery = N1qlQuery.parameterized(parseUpdate, fromJson);
        }
        if (this.mode == N1qlMode.UPSERT) {
            String parseUpsert = parseUpsert(asyncBucket.name(), fromJson);
            if (parseUpsert == null || parseUpsert.isEmpty()) {
                LOGGER.warn("could not generate statement from node " + RedactableArgument.user(fromJson));
                return Completable.complete();
            }
            parameterizedN1qlQuery = N1qlQuery.parameterized(parseUpsert, JsonObject.empty().put(this.idField, jsonBinaryDocument.id()));
        }
        return asyncBucket.query(parameterizedN1qlQuery).doOnEach(new Action1<Notification<? super AsyncN1qlQueryResult>>() { // from class: com.couchbase.connect.kafka.sink.N1qlWriter.1
            @Override // rx.functions.Action1
            public void call(Notification<? super AsyncN1qlQueryResult> notification) {
                AsyncN1qlQueryResult value;
                if (N1qlWriter.this.mode == N1qlMode.UPDATE && N1qlWriter.this.createDocuments && (value = notification.getValue()) != null && value.rows().count().toBlocking().single().intValue() == 0) {
                    String parseUpsert2 = N1qlWriter.this.parseUpsert(asyncBucket.name(), fromJson);
                    if (parseUpsert2 == null || parseUpsert2.isEmpty()) {
                        N1qlWriter.LOGGER.warn("could not generate statement from node " + RedactableArgument.user(fromJson));
                    }
                    asyncBucket.query(N1qlQuery.parameterized(parseUpsert2, JsonObject.empty().put(N1qlWriter.this.idField, jsonBinaryDocument.id()))).toBlocking().single();
                }
            }
        }).toCompletable();
    }

    private String parseUpdate(String str, JsonObject jsonObject) {
        if (jsonObject == null || jsonObject.equals(JsonObject.empty())) {
            return null;
        }
        StringBuilder sb = new StringBuilder();
        sb.append(String.format("UPDATE `%s` USE KEYS $%s SET ", str, this.idField));
        for (String str2 : jsonObject.getNames()) {
            sb.append(String.format("`%s` = $%s, ", str2, str2));
        }
        String sb2 = sb.toString();
        return sb2.substring(0, sb2.length() - 2) + " RETURNING meta().id;";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String parseUpsert(String str, JsonObject jsonObject) {
        if (jsonObject == null || jsonObject.equals(JsonObject.empty())) {
            return null;
        }
        return String.format("UPSERT INTO `%s` (KEY,VALUE) VALUES ($%s, ", str, this.idField) + jsonObject.toString() + ") RETURNING meta().id;";
    }
}
