package com.couchbase.connect.kafka;

import com.couchbase.client.core.env.SecurityConfig;
import com.couchbase.client.core.util.CbStrings;
import com.couchbase.client.dcp.CertificateAuthenticator;
import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.PasswordAuthenticator;
import com.couchbase.client.dcp.StaticCredentialsProvider;
import com.couchbase.client.dcp.StreamTo;
import com.couchbase.client.dcp.core.env.NetworkResolution;
import com.couchbase.client.dcp.highlevel.DatabaseChangeListener;
import com.couchbase.client.dcp.highlevel.Deletion;
import com.couchbase.client.dcp.highlevel.DocumentChange;
import com.couchbase.client.dcp.highlevel.Mutation;
import com.couchbase.client.dcp.highlevel.StreamFailure;
import com.couchbase.client.dcp.highlevel.StreamOffset;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.metrics.LogLevel;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.util.PartitionSet;
import com.couchbase.connect.kafka.config.source.CouchbaseSourceTaskConfig;
import com.couchbase.connect.kafka.util.ConnectHelper;
import com.couchbase.connect.kafka.util.JmxHelper;
import com.couchbase.connect.kafka.util.Version;
import io.micrometer.core.instrument.MeterRegistry;
import java.nio.file.Paths;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import javax.management.ObjectName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseReader.class */
public class CouchbaseReader extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseReader.class);
    private final Client client;
    private final List<Integer> partitions;
    private final Map<Integer, SourceOffset> partitionToSavedOffset;
    private final StreamFrom streamFrom;
    private final BlockingQueue<Throwable> errorQueue;
    private final MeterRegistry meterRegistry;

    public CouchbaseReader(CouchbaseSourceTaskConfig couchbaseSourceTaskConfig, String str, final BlockingQueue<DocumentChange> blockingQueue, final BlockingQueue<Throwable> blockingQueue2, List<Integer> list, Map<Integer, SourceOffset> map, final SourceDocumentLifecycle sourceDocumentLifecycle) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(sourceDocumentLifecycle);
        Objects.requireNonNull(blockingQueue);
        this.partitions = (List) Objects.requireNonNull(list);
        this.partitionToSavedOffset = (Map) Objects.requireNonNull(map);
        this.streamFrom = couchbaseSourceTaskConfig.streamFrom();
        this.errorQueue = (BlockingQueue) Objects.requireNonNull(blockingQueue2);
        PasswordAuthenticator passwordAuthenticator = CbStrings.isNullOrEmpty(couchbaseSourceTaskConfig.clientCertificatePath()) ? new PasswordAuthenticator(new StaticCredentialsProvider(couchbaseSourceTaskConfig.username(), couchbaseSourceTaskConfig.password().value())) : CertificateAuthenticator.fromKeyStore(Paths.get(couchbaseSourceTaskConfig.clientCertificatePath(), new String[0]), couchbaseSourceTaskConfig.clientCertificatePassword().value());
        Consumer consumer = builder -> {
            builder.enableTls(couchbaseSourceTaskConfig.enableTls()).enableHostnameVerification(couchbaseSourceTaskConfig.enableHostnameVerification());
            if (!CbStrings.isNullOrEmpty(couchbaseSourceTaskConfig.trustStorePath())) {
                builder.trustStore(Paths.get(couchbaseSourceTaskConfig.trustStorePath(), new String[0]), couchbaseSourceTaskConfig.trustStorePassword().value());
            }
            if (!CbStrings.isNullOrEmpty(couchbaseSourceTaskConfig.trustCertificatePath())) {
                builder.trustCertificate(Paths.get(couchbaseSourceTaskConfig.trustCertificatePath(), new String[0]));
            }
            if (CbStrings.isNullOrEmpty(couchbaseSourceTaskConfig.trustStorePath()) && CbStrings.isNullOrEmpty(couchbaseSourceTaskConfig.trustCertificatePath())) {
                builder.trustCertificates(SecurityConfig.defaultCaCertificates());
            }
        };
        this.meterRegistry = newMeterRegistry(str, couchbaseSourceTaskConfig);
        Client.Builder meterRegistry = Client.builder().userAgent("kafka-connector", Version.getVersion(), new String[]{str}).bootstrapTimeout(couchbaseSourceTaskConfig.bootstrapTimeout()).socketConnectTimeout(couchbaseSourceTaskConfig.bootstrapTimeout().toMillis()).seedNodes(couchbaseSourceTaskConfig.dcpSeedNodes()).networkResolution(NetworkResolution.valueOf(couchbaseSourceTaskConfig.network())).bucket(couchbaseSourceTaskConfig.bucket()).authenticator(passwordAuthenticator).collectionsAware(true).scopeName(couchbaseSourceTaskConfig.scope()).collectionNames(couchbaseSourceTaskConfig.collections()).noValue(couchbaseSourceTaskConfig.noValue()).xattrs(couchbaseSourceTaskConfig.xattrs()).compression(couchbaseSourceTaskConfig.compression()).mitigateRollbacks(couchbaseSourceTaskConfig.persistencePollingInterval().toMillis(), TimeUnit.MILLISECONDS).flowControl(couchbaseSourceTaskConfig.flowControlBuffer().getByteCountAsSaturatedInt()).bufferAckWatermark(60).securityConfig(consumer).meterRegistry(this.meterRegistry);
        if (couchbaseSourceTaskConfig.enableDcpTrace()) {
            Pattern compile = Pattern.compile(couchbaseSourceTaskConfig.dcpTraceDocumentIdRegex());
            meterRegistry.trace(LogLevel.INFO, compile.pattern().equals(".*") ? str2 -> {
                return true;
            } : str3 -> {
                return compile.matcher(str3).matches();
            });
        }
        this.client = meterRegistry.build();
        this.client.nonBlockingListener(new DatabaseChangeListener() { // from class: com.couchbase.connect.kafka.CouchbaseReader.1
            public void onMutation(Mutation mutation) {
                onChange(mutation);
            }

            public void onDeletion(Deletion deletion) {
                onChange(deletion);
            }

            private void onChange(DocumentChange documentChange) {
                try {
                    sourceDocumentLifecycle.logReceivedFromCouchbase(documentChange);
                    blockingQueue.put(documentChange);
                } catch (Throwable th) {
                    documentChange.flowControlAck();
                    CouchbaseReader.LOGGER.error("Unable to put DCP request into the queue", th);
                    blockingQueue2.offer(th);
                }
            }

            public void onFailure(StreamFailure streamFailure) {
                blockingQueue2.offer(streamFailure.getCause());
            }
        });
    }

    private static MeterRegistry newMeterRegistry(String str, CouchbaseSourceTaskConfig couchbaseSourceTaskConfig) {
        String orElse = ConnectHelper.getTaskIdFromLoggingContext().orElse(couchbaseSourceTaskConfig.maybeTaskId());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("connector", ObjectName.quote(str));
        linkedHashMap.put("task", orElse);
        return JmxHelper.newJmxMeterRegistry("kafka.connect.couchbase", linkedHashMap);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            this.client.connect().block();
            this.client.initializeState(this.streamFrom.withoutSavedOffset().asDcpStreamFrom(), StreamTo.INFINITY).block();
            if (this.streamFrom.isSavedOffset()) {
                initFailoverLogs();
                restoreSavedOffsets();
            }
            this.client.startStreaming(this.partitions).block();
        } catch (Throwable th) {
            this.errorQueue.offer(th);
        }
    }

    private void restoreSavedOffsets() {
        LOGGER.info("Resuming from saved offsets for {} of {} partitions: {}", new Object[]{Integer.valueOf(this.partitionToSavedOffset.size()), Integer.valueOf(this.partitions.size()), PartitionSet.from(this.partitionToSavedOffset.keySet())});
        TreeMap treeMap = new TreeMap();
        for (Map.Entry<Integer, SourceOffset> entry : this.partitionToSavedOffset.entrySet()) {
            int intValue = entry.getKey().intValue();
            SourceOffset value = entry.getValue();
            StreamOffset asStreamOffset = value.asStreamOffset();
            if (asStreamOffset.getVbuuid() == 0) {
                long lastUuid = this.client.sessionState().get(intValue).getLastUuid();
                asStreamOffset = value.withVbucketUuid(lastUuid).asStreamOffset();
                treeMap.put(Integer.valueOf(intValue), Long.valueOf(lastUuid));
            }
            this.client.sessionState().set(intValue, PartitionState.fromOffset(asStreamOffset));
        }
        if (treeMap.isEmpty()) {
            return;
        }
        LOGGER.info("Some source offsets are missing a partition UUID. This is normal if you're upgrading from connector version 3.4.5 or earlier. This message should go away after a document from each partition is published to Kafka. Here is the map from partition number to the latest partition UUID used as a fallback: {}", treeMap);
    }

    private void initFailoverLogs() {
        this.client.failoverLogs(this.partitions).doOnNext(byteBuf -> {
            int vbucket = DcpFailoverLogResponse.vbucket(byteBuf);
            PartitionState partitionState = this.client.sessionState().get(vbucket);
            partitionState.setFailoverLog(DcpFailoverLogResponse.entries(byteBuf));
            this.client.sessionState().set(vbucket, partitionState);
        }).blockLast();
    }

    public void shutdown() {
        try {
            this.client.disconnect().block();
        } finally {
            this.meterRegistry.close();
        }
    }
}
