package com.couchbase.connect.kafka.sink;

import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.ArrayAppend;
import com.couchbase.client.java.kv.ArrayPrepend;
import com.couchbase.client.java.kv.MutateInOptions;
import com.couchbase.client.java.kv.MutateInSpec;
import com.couchbase.client.java.kv.StoreSemantics;
import com.couchbase.client.java.kv.Upsert;
import com.couchbase.connect.kafka.util.DocumentPathExtractor;
import com.couchbase.connect.kafka.util.DurabilitySetter;
import com.couchbase.connect.kafka.util.JsonBinaryDocument;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/couchbase/connect/kafka/sink/SubDocumentWriter.class */
public class SubDocumentWriter {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubDocumentWriter.class);
    private final SubDocumentMode mode;
    private final String path;
    private final boolean createPaths;
    private final boolean pathIsDynamic;
    private final boolean createDocuments;
    private final Duration documentExpiry;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/couchbase/connect/kafka/sink/SubDocumentWriter$SubdocOperation.class */
    public static class SubdocOperation {
        private final String id;
        private final String path;
        private final JsonObject data;

        public SubdocOperation(String str, String str2, byte[] bArr) {
            this.id = str;
            this.path = str2;
            this.data = JsonObject.fromJson(bArr);
        }

        public String getId() {
            return this.id;
        }

        public String getPath() {
            return this.path;
        }

        public JsonObject getData() {
            return this.data;
        }
    }

    public SubDocumentWriter(SubDocumentMode subDocumentMode, String str, boolean z, boolean z2, Duration duration) {
        this.mode = (SubDocumentMode) Objects.requireNonNull(subDocumentMode);
        if (str.startsWith("/")) {
            this.path = "${" + str + "}";
            this.pathIsDynamic = true;
        } else {
            this.path = str;
            this.pathIsDynamic = false;
        }
        this.createPaths = z;
        this.createDocuments = z2;
        this.documentExpiry = (Duration) Objects.requireNonNull(duration);
    }

    public Mono<Void> write(ReactiveCollection reactiveCollection, JsonBinaryDocument jsonBinaryDocument, DurabilitySetter durabilitySetter) {
        Upsert arrayPrepend;
        SubdocOperation operation = getOperation(jsonBinaryDocument);
        switch (this.mode) {
            case UPSERT:
                arrayPrepend = MutateInSpec.upsert(operation.getPath(), operation.getData());
                if (this.createPaths) {
                    arrayPrepend = arrayPrepend.createPath();
                    break;
                }
                break;
            case ARRAY_APPEND:
                arrayPrepend = MutateInSpec.arrayAppend(operation.getPath(), Collections.singletonList(operation.getData()));
                if (this.createPaths) {
                    arrayPrepend = ((ArrayAppend) arrayPrepend).createPath();
                    break;
                }
                break;
            case ARRAY_PREPEND:
                arrayPrepend = MutateInSpec.arrayPrepend(operation.getPath(), Collections.singletonList(operation.getData()));
                if (this.createPaths) {
                    arrayPrepend = ((ArrayPrepend) arrayPrepend).createPath();
                    break;
                }
                break;
            default:
                throw new RuntimeException("Unsupported subdoc mode: " + this.mode);
        }
        MutateInOptions storeSemantics = MutateInOptions.mutateInOptions().expiry(this.documentExpiry).storeSemantics(this.createDocuments ? StoreSemantics.UPSERT : StoreSemantics.REPLACE);
        durabilitySetter.accept(storeSemantics);
        return reactiveCollection.mutateIn(jsonBinaryDocument.id(), Collections.singletonList(arrayPrepend), storeSemantics).then();
    }

    private SubdocOperation getOperation(JsonBinaryDocument jsonBinaryDocument) {
        if (!this.pathIsDynamic) {
            return new SubdocOperation(jsonBinaryDocument.id(), this.path, jsonBinaryDocument.content());
        }
        try {
            DocumentPathExtractor.DocumentExtraction extractDocumentPath = new DocumentPathExtractor(this.path, true).extractDocumentPath(jsonBinaryDocument.content());
            return new SubdocOperation(jsonBinaryDocument.id(), extractDocumentPath.getPathValue(), extractDocumentPath.getData());
        } catch (DocumentPathExtractor.DocumentPathNotFoundException | IOException e) {
            LOGGER.error(e.getMessage(), e);
            return new SubdocOperation(jsonBinaryDocument.id(), null, null);
        }
    }
}
