package com.couchbase.connect.kafka;

import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.logging.RedactionLevel;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.PersistTo;
import com.couchbase.client.java.ReplicateTo;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import com.couchbase.client.java.error.DocumentDoesNotExistException;
import com.couchbase.client.java.util.retry.RetryBuilder;
import com.couchbase.connect.kafka.sink.DocumentMode;
import com.couchbase.connect.kafka.sink.N1qlMode;
import com.couchbase.connect.kafka.sink.N1qlWriter;
import com.couchbase.connect.kafka.sink.SubDocumentMode;
import com.couchbase.connect.kafka.sink.SubDocumentWriter;
import com.couchbase.connect.kafka.util.DocumentIdExtractor;
import com.couchbase.connect.kafka.util.JsonBinaryDocument;
import com.couchbase.connect.kafka.util.JsonBinaryTranscoder;
import com.couchbase.connect.kafka.util.Version;
import com.couchbase.connect.kafka.util.config.Password;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
import rx.Observable;
import rx.functions.Func1;

/* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseSinkTask.class */
public class CouchbaseSinkTask extends SinkTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseSinkTask.class);
    private Map<String, String> configProperties;
    private CouchbaseSinkTaskConfig config;
    private Bucket bucket;
    private CouchbaseCluster cluster;
    private JsonConverter converter;
    private DocumentIdExtractor documentIdExtractor;
    private String path;
    private DocumentMode documentMode;
    private SubDocumentWriter subDocumentWriter;
    private SubDocumentMode subDocumentMode;
    private N1qlWriter n1qlWriter;
    private N1qlMode n1qlMode;
    private boolean createPaths;
    private boolean createDocuments;
    private PersistTo persistTo;
    private ReplicateTo replicateTo;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        try {
            this.configProperties = map;
            this.config = new CouchbaseSinkTaskConfig(this.configProperties);
            CouchbaseSourceConnector.setForceIpv4(this.config.getBoolean(CouchbaseSourceConnectorConfig.FORCE_IPV4_CONFIG).booleanValue());
            CouchbaseLoggerFactory.setRedactionLevel((RedactionLevel) this.config.getEnum(RedactionLevel.class, CouchbaseSourceConnectorConfig.LOG_REDACTION_CONFIG));
            List list = this.config.getList(CouchbaseSourceConnectorConfig.CONNECTION_CLUSTER_ADDRESS_CONFIG);
            String string = this.config.getString(CouchbaseSourceConnectorConfig.CONNECTION_BUCKET_CONFIG);
            String username = this.config.getUsername();
            String str = Password.CONNECTION.get(this.config);
            boolean booleanValue = this.config.getBoolean(CouchbaseSourceConnectorConfig.CONNECTION_SSL_ENABLED_CONFIG).booleanValue();
            this.cluster = CouchbaseCluster.create(DefaultCouchbaseEnvironment.builder().sslEnabled(booleanValue).sslKeystoreFile(this.config.getString(CouchbaseSourceConnectorConfig.CONNECTION_SSL_KEYSTORE_LOCATION_CONFIG)).sslKeystorePassword(Password.SSL_KEYSTORE.get(this.config)).connectTimeout(this.config.getLong(CouchbaseSourceConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG).longValue()).build(), (List<String>) list);
            this.cluster.authenticate(username, str);
            this.bucket = this.cluster.openBucket(string, Collections.singletonList(new JsonBinaryTranscoder()));
            this.converter = new JsonConverter();
            this.converter.configure(Collections.singletonMap("schemas.enable", false), false);
            String string2 = this.config.getString(CouchbaseSinkConnectorConfig.DOCUMENT_ID_POINTER_CONFIG);
            if (string2 != null && !string2.isEmpty()) {
                this.documentIdExtractor = new DocumentIdExtractor(string2, this.config.getBoolean(CouchbaseSinkConnectorConfig.REMOVE_DOCUMENT_ID_CONFIG).booleanValue());
            }
            this.documentMode = (DocumentMode) this.config.getEnum(DocumentMode.class, CouchbaseSinkConnectorConfig.DOCUMENT_MODE_CONFIG);
            this.persistTo = (PersistTo) this.config.getEnum(PersistTo.class, CouchbaseSinkConnectorConfig.PERSIST_TO_CONFIG);
            this.replicateTo = (ReplicateTo) this.config.getEnum(ReplicateTo.class, CouchbaseSinkConnectorConfig.REPLICATE_TO_CONFIG);
            switch (this.documentMode) {
                case SUBDOCUMENT:
                    this.subDocumentMode = (SubDocumentMode) this.config.getEnum(SubDocumentMode.class, CouchbaseSinkConnectorConfig.SUBDOCUMENT_MODE_CONFIG);
                    this.path = this.config.getString(CouchbaseSinkConnectorConfig.SUBDOCUMENT_PATH_CONFIG);
                    this.createPaths = this.config.getBoolean(CouchbaseSinkConnectorConfig.SUBDOCUMENT_CREATEPATH_CONFIG).booleanValue();
                    this.createDocuments = this.config.getBoolean(CouchbaseSinkConnectorConfig.SUBDOCUMENT_CREATEDOCUMENT_CONFIG).booleanValue();
                    this.subDocumentWriter = new SubDocumentWriter(this.subDocumentMode, this.path, this.createPaths, this.createDocuments);
                    return;
                case N1QL:
                    this.n1qlMode = (N1qlMode) this.config.getEnum(N1qlMode.class, CouchbaseSinkConnectorConfig.N1QL_MODE_CONFIG);
                    this.createDocuments = this.config.getBoolean(CouchbaseSinkConnectorConfig.SUBDOCUMENT_CREATEDOCUMENT_CONFIG).booleanValue();
                    this.n1qlWriter = new N1qlWriter(this.n1qlMode, this.createDocuments);
                    return;
                default:
                    return;
            }
        } catch (ConfigException e) {
            throw new ConnectException("Couldn't start CouchbaseSinkTask due to configuration error", e);
        }
    }

    public void put(Collection<SinkRecord> collection) {
        if (collection.isEmpty()) {
            return;
        }
        SinkRecord next = collection.iterator().next();
        LOGGER.trace("Received {} records. First record kafka coordinates:({}-{}-{}). Writing them to the Couchbase...", new Object[]{Integer.valueOf(collection.size()), next.topic(), next.kafkaPartition(), Long.valueOf(next.kafkaOffset())});
        Observable.from(collection).flatMapCompletable(new Func1<SinkRecord, Completable>() { // from class: com.couchbase.connect.kafka.CouchbaseSinkTask.1
            @Override // rx.functions.Func1
            public Completable call(SinkRecord sinkRecord) {
                if (sinkRecord.value() == null) {
                    return CouchbaseSinkTask.this.removeIfExists(CouchbaseSinkTask.documentIdFromKafkaMetadata(sinkRecord));
                }
                JsonBinaryDocument convert = CouchbaseSinkTask.this.convert(sinkRecord);
                switch (AnonymousClass3.$SwitchMap$com$couchbase$connect$kafka$sink$DocumentMode[CouchbaseSinkTask.this.documentMode.ordinal()]) {
                    case 1:
                        return CouchbaseSinkTask.this.subDocumentWriter.write(CouchbaseSinkTask.this.bucket.async(), convert, CouchbaseSinkTask.this.persistTo, CouchbaseSinkTask.this.replicateTo);
                    case 2:
                        return CouchbaseSinkTask.this.n1qlWriter.write(CouchbaseSinkTask.this.bucket.async(), convert, CouchbaseSinkTask.this.persistTo, CouchbaseSinkTask.this.replicateTo);
                    default:
                        return CouchbaseSinkTask.this.bucket.async().upsert(convert, CouchbaseSinkTask.this.persistTo, CouchbaseSinkTask.this.replicateTo).toCompletable();
                }
            }
        }).retryWhen(RetryBuilder.anyOf(RuntimeException.class).delay(Delay.exponential(TimeUnit.SECONDS, 5L)).max(5).build()).toCompletable().await();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Completable removeIfExists(String str) {
        return this.bucket.async().remove(str, this.persistTo, this.replicateTo).onErrorResumeNext(new Func1<Throwable, Observable<JsonDocument>>() { // from class: com.couchbase.connect.kafka.CouchbaseSinkTask.2
            @Override // rx.functions.Func1
            public Observable<JsonDocument> call(Throwable th) {
                return th instanceof DocumentDoesNotExistException ? Observable.empty() : Observable.error(th);
            }
        }).toCompletable();
    }

    private static String toString(ByteBuffer byteBuffer) {
        ByteBuffer slice = byteBuffer.slice();
        byte[] bArr = new byte[slice.remaining()];
        slice.get(bArr);
        return new String(bArr, CharsetUtil.UTF_8);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String documentIdFromKafkaMetadata(SinkRecord sinkRecord) {
        Object key = sinkRecord.key();
        return ((key instanceof String) || (key instanceof Number) || (key instanceof Boolean)) ? key.toString() : key instanceof byte[] ? new String((byte[]) key, CharsetUtil.UTF_8) : key instanceof ByteBuffer ? toString((ByteBuffer) key) : sinkRecord.topic() + "/" + sinkRecord.kafkaPartition() + "/" + sinkRecord.kafkaOffset();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public JsonBinaryDocument convert(SinkRecord sinkRecord) {
        byte[] fromConnectData = this.converter.fromConnectData(sinkRecord.topic(), sinkRecord.valueSchema(), sinkRecord.value());
        String str = null;
        try {
            if (this.documentIdExtractor != null) {
                return this.documentIdExtractor.extractDocumentId(fromConnectData);
            }
        } catch (DocumentIdExtractor.DocumentIdNotFoundException e) {
            str = documentIdFromKafkaMetadata(sinkRecord);
            LOGGER.warn(e.getMessage() + "; using fallback ID '{}'", str);
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
        if (str == null) {
            str = documentIdFromKafkaMetadata(sinkRecord);
        }
        return JsonBinaryDocument.create(str, fromConnectData);
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
    }

    public void stop() {
        this.cluster.disconnect();
    }
}
