package com.couchbase.connect.kafka;

import com.couchbase.client.core.error.DocumentNotFoundException;
import com.couchbase.client.core.logging.LogRedaction;
import com.couchbase.client.core.util.CbCollections;
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.codec.RawJsonTranscoder;
import com.couchbase.client.java.kv.PersistTo;
import com.couchbase.client.java.kv.RemoveOptions;
import com.couchbase.client.java.kv.ReplicateTo;
import com.couchbase.client.java.kv.UpsertOptions;
import com.couchbase.connect.kafka.config.sink.CouchbaseSinkConfig;
import com.couchbase.connect.kafka.sink.DocumentMode;
import com.couchbase.connect.kafka.sink.N1qlWriter;
import com.couchbase.connect.kafka.sink.SubDocumentWriter;
import com.couchbase.connect.kafka.util.DocumentIdExtractor;
import com.couchbase.connect.kafka.util.DocumentPathExtractor;
import com.couchbase.connect.kafka.util.JsonBinaryDocument;
import com.couchbase.connect.kafka.util.Version;
import com.couchbase.connect.kafka.util.config.ConfigHelper;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseSinkTask.class */
public class CouchbaseSinkTask extends SinkTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseSinkTask.class);
    private String bucketName;
    private Collection collection;
    private KafkaCouchbaseClient client;
    private JsonConverter converter;
    private DocumentIdExtractor documentIdExtractor;
    private DocumentMode documentMode;
    private SubDocumentWriter subDocumentWriter;
    private N1qlWriter n1qlWriter;
    private PersistTo persistTo;
    private ReplicateTo replicateTo;
    private Duration documentExpiry;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        try {
            CouchbaseSinkConfig couchbaseSinkConfig = (CouchbaseSinkConfig) ConfigHelper.parse(CouchbaseSinkConfig.class, map);
            LogRedaction.setRedactionLevel(couchbaseSinkConfig.logRedaction());
            this.client = new KafkaCouchbaseClient(couchbaseSinkConfig);
            this.bucketName = couchbaseSinkConfig.bucket();
            this.collection = this.client.cluster().bucket(this.bucketName).defaultCollection();
            this.converter = new JsonConverter();
            this.converter.configure(CbCollections.mapOf("schemas.enable", false), false);
            String documentId = couchbaseSinkConfig.documentId();
            if (documentId != null && !documentId.isEmpty()) {
                this.documentIdExtractor = new DocumentIdExtractor(documentId, couchbaseSinkConfig.removeDocumentId());
            }
            this.documentMode = couchbaseSinkConfig.documentMode();
            this.persistTo = couchbaseSinkConfig.persistTo();
            this.replicateTo = couchbaseSinkConfig.replicateTo();
            this.documentExpiry = couchbaseSinkConfig.documentExpiration();
            switch (this.documentMode) {
                case SUBDOCUMENT:
                    this.subDocumentWriter = new SubDocumentWriter(couchbaseSinkConfig.subdocumentOperation(), couchbaseSinkConfig.subdocumentPath(), couchbaseSinkConfig.subdocumentCreatePath(), couchbaseSinkConfig.createDocument(), this.documentExpiry);
                    return;
                case N1QL:
                    this.n1qlWriter = new N1qlWriter(couchbaseSinkConfig.n1qlOperation(), couchbaseSinkConfig.n1qlWhereFields(), couchbaseSinkConfig.createDocument());
                    return;
                default:
                    return;
            }
        } catch (ConfigException e) {
            throw new ConnectException("Couldn't start CouchbaseSinkTask due to configuration error", e);
        }
    }

    public void put(java.util.Collection<SinkRecord> collection) {
        if (collection.isEmpty()) {
            return;
        }
        SinkRecord next = collection.iterator().next();
        LOGGER.trace("Received {} records. First record kafka coordinates:({}-{}-{}). Writing them to the Couchbase...", new Object[]{Integer.valueOf(collection.size()), next.topic(), next.kafkaPartition(), Long.valueOf(next.kafkaOffset())});
        Flux.fromIterable(collection).flatMap(sinkRecord -> {
            if (sinkRecord.value() == null) {
                return removeIfExists(documentIdFromKafkaMetadata(sinkRecord));
            }
            JsonBinaryDocument convert = convert(sinkRecord);
            switch (this.documentMode) {
                case SUBDOCUMENT:
                    return this.subDocumentWriter.write(this.collection.reactive(), convert, this.persistTo, this.replicateTo);
                case N1QL:
                    return this.n1qlWriter.write(this.client.cluster(), this.bucketName, convert);
                default:
                    return this.collection.reactive().upsert(convert.id(), convert.content(), UpsertOptions.upsertOptions().durability(this.persistTo, this.replicateTo).expiry(this.documentExpiry).transcoder(RawJsonTranscoder.INSTANCE)).then();
            }
        }).blockLast();
    }

    private Mono<Void> removeIfExists(String str) {
        return this.collection.reactive().remove(str, RemoveOptions.removeOptions().durability(this.persistTo, this.replicateTo)).onErrorResume(DocumentNotFoundException.class, documentNotFoundException -> {
            return Mono.empty();
        }).then();
    }

    private static String toString(ByteBuffer byteBuffer) {
        ByteBuffer slice = byteBuffer.slice();
        byte[] bArr = new byte[slice.remaining()];
        slice.get(bArr);
        return new String(bArr, StandardCharsets.UTF_8);
    }

    private static String documentIdFromKafkaMetadata(SinkRecord sinkRecord) {
        Object key = sinkRecord.key();
        return ((key instanceof String) || (key instanceof Number) || (key instanceof Boolean)) ? key.toString() : key instanceof byte[] ? new String((byte[]) key, StandardCharsets.UTF_8) : key instanceof ByteBuffer ? toString((ByteBuffer) key) : sinkRecord.topic() + "/" + sinkRecord.kafkaPartition() + "/" + sinkRecord.kafkaOffset();
    }

    private JsonBinaryDocument convert(SinkRecord sinkRecord) {
        byte[] fromConnectData = this.converter.fromConnectData(sinkRecord.topic(), sinkRecord.valueSchema(), sinkRecord.value());
        String str = null;
        try {
            if (this.documentIdExtractor != null) {
                return this.documentIdExtractor.extractDocumentId(fromConnectData);
            }
        } catch (DocumentPathExtractor.DocumentPathNotFoundException e) {
            str = documentIdFromKafkaMetadata(sinkRecord);
            LOGGER.warn(e.getMessage() + "; using fallback ID '{}'", str);
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
        if (str == null) {
            str = documentIdFromKafkaMetadata(sinkRecord);
        }
        return new JsonBinaryDocument(str, fromConnectData);
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
    }

    public void stop() {
        this.client.close();
    }
}
