package com.couchbase.connect.kafka.sink;

import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import com.couchbase.client.java.AsyncBucket;
import com.couchbase.client.java.PersistTo;
import com.couchbase.client.java.ReplicateTo;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.error.DocumentDoesNotExistException;
import com.couchbase.client.java.subdoc.AsyncMutateInBuilder;
import com.couchbase.client.java.subdoc.SubdocOptionsBuilder;
import com.couchbase.connect.kafka.util.JsonBinaryDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
import rx.functions.Action1;

/* loaded from: input_file:com/couchbase/connect/kafka/sink/SubDocumentWriter.class */
public class SubDocumentWriter {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubDocumentWriter.class);
    private SubDocumentMode mode;
    private String path;
    private boolean createPaths;
    private boolean createDocuments;

    public SubDocumentWriter(SubDocumentMode subDocumentMode, String str, boolean z, boolean z2) {
        this.mode = subDocumentMode;
        this.path = str;
        this.createPaths = z;
        this.createDocuments = z2;
    }

    public Completable write(final AsyncBucket asyncBucket, final JsonBinaryDocument jsonBinaryDocument, PersistTo persistTo, ReplicateTo replicateTo) {
        if (jsonBinaryDocument == null || (jsonBinaryDocument.content() == null && (jsonBinaryDocument.id() == null || jsonBinaryDocument.id().isEmpty()))) {
            LOGGER.warn("document or document content is null");
            return Completable.complete();
        }
        JsonObject fromJson = JsonObject.fromJson(jsonBinaryDocument.content().toString(CharsetUtil.UTF_8));
        SubdocOptionsBuilder createPath = new SubdocOptionsBuilder().createPath(this.createPaths);
        AsyncMutateInBuilder mutateIn = asyncBucket.mutateIn(jsonBinaryDocument.id());
        if (jsonBinaryDocument.content() != null || jsonBinaryDocument.id().isEmpty()) {
            switch (this.mode) {
                case UPSERT:
                    mutateIn = mutateIn.upsert(this.path, (String) fromJson, createPath);
                    break;
                case ARRAY_INSERT:
                    mutateIn = mutateIn.arrayInsert(this.path, fromJson, createPath);
                    break;
                case ARRAY_APPEND:
                    mutateIn = mutateIn.arrayAppend(this.path, (String) fromJson, createPath);
                    break;
                case ARRAY_PREPEND:
                    mutateIn = mutateIn.arrayPrepend(this.path, (String) fromJson, createPath);
                    break;
                case ARRAY_INSERT_ALL:
                    mutateIn = mutateIn.arrayInsertAll(this.path, fromJson, createPath);
                    break;
                case ARRAY_APPEND_ALL:
                    mutateIn = mutateIn.arrayAppendAll(this.path, fromJson, createPath);
                    break;
                case ARRAY_PREPEND_ALL:
                    mutateIn = mutateIn.arrayPrependAll(this.path, fromJson, createPath);
                    break;
                case ARRAY_ADD_UNIQUE:
                    mutateIn = mutateIn.arrayAddUnique(this.path, (String) fromJson, createPath);
                    break;
            }
        } else {
            mutateIn = mutateIn.remove(this.path, createPath);
        }
        return mutateIn.execute(persistTo, replicateTo).doOnError(new Action1<Throwable>() { // from class: com.couchbase.connect.kafka.sink.SubDocumentWriter.1
            @Override // rx.functions.Action1
            public void call(Throwable th) {
                if (SubDocumentWriter.this.createDocuments && (th instanceof DocumentDoesNotExistException)) {
                    asyncBucket.insert(JsonDocument.create(jsonBinaryDocument.id())).toBlocking().single();
                }
            }
        }).toCompletable();
    }
}
