package com.couchbase.connect.kafka;

import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.StreamTo;
import com.couchbase.client.dcp.core.env.NetworkResolution;
import com.couchbase.client.dcp.highlevel.DatabaseChangeListener;
import com.couchbase.client.dcp.highlevel.Deletion;
import com.couchbase.client.dcp.highlevel.DocumentChange;
import com.couchbase.client.dcp.highlevel.Mutation;
import com.couchbase.client.dcp.highlevel.SnapshotMarker;
import com.couchbase.client.dcp.highlevel.StreamFailure;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.state.FailoverLogEntry;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.connect.kafka.config.source.CouchbaseSourceTaskConfig;
import com.couchbase.connect.kafka.util.Version;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseReader.class */
public class CouchbaseReader extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseReader.class);
    private final Client client;
    private final Short[] partitions;
    private final Map<Short, SeqnoAndVbucketUuid> partitionToSavedSeqno;
    private final StreamFrom streamFrom;
    private final BlockingQueue<Throwable> errorQueue;

    public CouchbaseReader(CouchbaseSourceTaskConfig couchbaseSourceTaskConfig, String str, final BlockingQueue<DocumentChange> blockingQueue, final BlockingQueue<Throwable> blockingQueue2, Short[] shArr, Map<Short, SeqnoAndVbucketUuid> map) {
        this.partitions = shArr;
        this.partitionToSavedSeqno = map;
        this.streamFrom = couchbaseSourceTaskConfig.streamFrom();
        this.errorQueue = blockingQueue2;
        this.client = Client.builder().userAgent("kafka-connector", Version.getVersion(), new String[]{str}).connectTimeout(couchbaseSourceTaskConfig.bootstrapTimeout().toMillis()).seedNodes(couchbaseSourceTaskConfig.dcpSeedNodes()).networkResolution(NetworkResolution.valueOf(couchbaseSourceTaskConfig.network())).bucket(couchbaseSourceTaskConfig.bucket()).credentials(couchbaseSourceTaskConfig.username(), couchbaseSourceTaskConfig.password().value()).collectionsAware(true).scopeName(couchbaseSourceTaskConfig.scope()).collectionNames(couchbaseSourceTaskConfig.collections()).compression(couchbaseSourceTaskConfig.compression()).mitigateRollbacks(couchbaseSourceTaskConfig.persistencePollingInterval().toMillis(), TimeUnit.MILLISECONDS).flowControl(couchbaseSourceTaskConfig.flowControlBufferSize().getByteCountAsSaturatedInt()).bufferAckWatermark(60).sslEnabled(couchbaseSourceTaskConfig.enableTls()).sslKeystoreFile(couchbaseSourceTaskConfig.trustStorePath()).sslKeystorePassword(couchbaseSourceTaskConfig.trustStorePassword().value()).build();
        this.client.nonBlockingListener(new DatabaseChangeListener() { // from class: com.couchbase.connect.kafka.CouchbaseReader.1
            public void onMutation(Mutation mutation) {
                onChange(mutation);
            }

            public void onDeletion(Deletion deletion) {
                onChange(deletion);
            }

            private void onChange(DocumentChange documentChange) {
                try {
                    blockingQueue.put(documentChange);
                } catch (Throwable th) {
                    documentChange.flowControlAck();
                    CouchbaseReader.LOGGER.error("Unable to put DCP request into the queue", th);
                    blockingQueue2.offer(th);
                }
            }

            public void onFailure(StreamFailure streamFailure) {
                blockingQueue2.offer(streamFailure.getCause());
            }
        });
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            this.client.connect().await();
            this.client.initializeState(this.streamFrom.withoutSavedOffset().asDcpStreamFrom(), StreamTo.INFINITY).await();
            if (this.streamFrom.isSavedOffset()) {
                initFailoverLogs();
                restoreSavedOffsets();
            }
            this.client.startStreaming(this.partitions).await();
        } catch (Throwable th) {
            this.errorQueue.offer(th);
        }
    }

    private void restoreSavedOffsets() {
        LOGGER.info("Resuming from saved offsets for {} of {} partitions", Integer.valueOf(this.partitionToSavedSeqno.size()), Integer.valueOf(this.partitions.length));
        for (Map.Entry<Short, SeqnoAndVbucketUuid> entry : this.partitionToSavedSeqno.entrySet()) {
            short shortValue = entry.getKey().shortValue();
            SeqnoAndVbucketUuid value = entry.getValue();
            long seqno = value.seqno();
            PartitionState partitionState = this.client.sessionState().get(shortValue);
            partitionState.setStartSeqno(seqno);
            partitionState.setSnapshot(new SnapshotMarker(seqno, seqno));
            if (value.vbucketUuid().isPresent()) {
                long asLong = value.vbucketUuid().getAsLong();
                LOGGER.debug("Initializing failover log for partition {} using stored vbuuid {} ", Short.valueOf(shortValue), Long.valueOf(asLong));
                partitionState.setFailoverLog(Collections.singletonList(new FailoverLogEntry(-1L, asLong)));
            } else {
                LOGGER.warn("No vBucket UUID is associated with stream offset for partition {}. This is normal if you're upgrading from connector version 3.4.5 or earlier, and should stop happening once the Kafka Connect framework asks the connector to save its offsets (see connector worker config property 'offset.flush.interval.ms').", Short.valueOf(shortValue));
            }
            this.client.sessionState().set(shortValue, partitionState);
        }
    }

    private void initFailoverLogs() {
        this.client.failoverLogs(this.partitions).toBlocking().forEach(byteBuf -> {
            short vbucket = DcpFailoverLogResponse.vbucket(byteBuf);
            PartitionState partitionState = this.client.sessionState().get(vbucket);
            partitionState.setFailoverLog(DcpFailoverLogResponse.entries(byteBuf));
            this.client.sessionState().set(vbucket, partitionState);
        });
    }

    public void shutdown() {
        this.client.disconnect().await();
    }
}
