package com.couchbase.connect.kafka;

import com.couchbase.client.core.env.NetworkResolution;
import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.DefaultConnectionNameGenerator;
import com.couchbase.client.dcp.StreamTo;
import com.couchbase.client.dcp.config.CompressionMode;
import com.couchbase.client.dcp.config.DcpControl;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.state.FailoverLogEntry;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.util.IllegalReferenceCountException;
import com.couchbase.connect.kafka.dcp.Event;
import com.couchbase.connect.kafka.util.Version;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.CompletableSubscriber;
import rx.Subscription;

/* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseReader.class */
public class CouchbaseReader extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseReader.class);
    private final Client client;
    private final Short[] partitions;
    private final Map<Short, SeqnoAndVbucketUuid> partitionToSavedSeqno;
    private final StreamFrom streamFrom;
    private final BlockingQueue<Throwable> errorQueue;

    public CouchbaseReader(String str, List<String> list, String str2, String str3, String str4, long j, final BlockingQueue<Event> blockingQueue, final BlockingQueue<Throwable> blockingQueue2, Short[] shArr, Map<Short, SeqnoAndVbucketUuid> map, StreamFrom streamFrom, boolean z, String str5, String str6, CompressionMode compressionMode, long j2, int i, NetworkResolution networkResolution) {
        this.partitions = shArr;
        this.partitionToSavedSeqno = map;
        this.streamFrom = streamFrom;
        this.errorQueue = blockingQueue2;
        this.client = Client.configure().connectionNameGenerator(DefaultConnectionNameGenerator.forProduct("kafka-connector", Version.getVersion(), str)).connectTimeout(j).hostnames(list).networkResolution(networkResolution).bucket(str2).username(str3).password(str4).controlParam(DcpControl.Names.ENABLE_NOOP, "true").compression(compressionMode).mitigateRollbacks(j2, TimeUnit.MILLISECONDS).flowControl(i).bufferAckWatermark(60).sslEnabled(z).sslKeystoreFile(str5).sslKeystorePassword(str6).build();
        this.client.controlEventHandler(new ControlEventHandler() { // from class: com.couchbase.connect.kafka.CouchbaseReader.1
            @Override // com.couchbase.client.dcp.ControlEventHandler
            public void onEvent(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
                try {
                    try {
                        if (RollbackMessage.is(byteBuf)) {
                            final short vbucket = RollbackMessage.vbucket(byteBuf);
                            final long seqno = RollbackMessage.seqno(byteBuf);
                            CouchbaseReader.LOGGER.warn("Rolling back partition {} to seqno {}", Short.valueOf(vbucket), Long.valueOf(seqno));
                            CouchbaseReader.this.client.rollbackAndRestartStream(vbucket, seqno).subscribe(new CompletableSubscriber() { // from class: com.couchbase.connect.kafka.CouchbaseReader.1.1
                                @Override // rx.CompletableSubscriber
                                public void onCompleted() {
                                    CouchbaseReader.LOGGER.info("Rollback for partition {} complete", Short.valueOf(vbucket));
                                }

                                @Override // rx.CompletableSubscriber
                                public void onError(Throwable th) {
                                    CouchbaseReader.LOGGER.error("Failed to roll back partition {} to seqno {}", new Object[]{Short.valueOf(vbucket), Long.valueOf(seqno), th});
                                    blockingQueue2.offer(th);
                                }

                                @Override // rx.CompletableSubscriber
                                public void onSubscribe(Subscription subscription) {
                                }
                            });
                        }
                        CouchbaseReader.ackAndRelease(channelFlowController, byteBuf);
                    } catch (Throwable th) {
                        CouchbaseReader.LOGGER.error("Exception in control event handler", th);
                        blockingQueue2.offer(th);
                        CouchbaseReader.ackAndRelease(channelFlowController, byteBuf);
                    }
                } catch (Throwable th2) {
                    CouchbaseReader.ackAndRelease(channelFlowController, byteBuf);
                    throw th2;
                }
            }
        });
        this.client.dataEventHandler(new DataEventHandler() { // from class: com.couchbase.connect.kafka.CouchbaseReader.2
            @Override // com.couchbase.client.dcp.DataEventHandler
            public void onEvent(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
                try {
                    blockingQueue.put(new Event(byteBuf, CouchbaseReader.this.getVBucketUuid(MessageUtil.getVbucket(byteBuf)), channelFlowController));
                } catch (Throwable th) {
                    CouchbaseReader.LOGGER.error("Unable to put DCP request into the queue", th);
                    CouchbaseReader.ackAndRelease(channelFlowController, byteBuf);
                    blockingQueue2.offer(th);
                }
            }
        });
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            this.client.connect().await();
            this.client.initializeState(this.streamFrom.withoutSavedOffset().asDcpStreamFrom(), StreamTo.INFINITY).await();
            if (this.streamFrom.isSavedOffset()) {
                initFailoverLogs();
                restoreSavedOffsets();
            }
            this.client.startStreaming(this.partitions).await();
        } catch (Throwable th) {
            this.errorQueue.offer(th);
        }
    }

    private void restoreSavedOffsets() {
        LOGGER.info("Resuming from saved offsets for {} of {} partitions", Integer.valueOf(this.partitionToSavedSeqno.size()), Integer.valueOf(this.partitions.length));
        for (Map.Entry<Short, SeqnoAndVbucketUuid> entry : this.partitionToSavedSeqno.entrySet()) {
            short shortValue = entry.getKey().shortValue();
            SeqnoAndVbucketUuid value = entry.getValue();
            long seqno = value.seqno();
            PartitionState partitionState = this.client.sessionState().get(shortValue);
            partitionState.setStartSeqno(seqno);
            partitionState.setSnapshotStartSeqno(seqno);
            partitionState.setSnapshotEndSeqno(seqno);
            if (value.vbucketUuid().isPresent()) {
                long asLong = value.vbucketUuid().getAsLong();
                LOGGER.debug("Initializing failover log for partition {} using stored vbuuid {} ", Short.valueOf(shortValue), Long.valueOf(asLong));
                partitionState.setFailoverLog(Collections.singletonList(new FailoverLogEntry(-1L, asLong)));
            } else {
                LOGGER.warn("No vBucket UUID is associated with stream offset for partition {}. This is normal if you're upgrading from connector version 3.4.5 or earlier, and should stop happening once the Kafka Connect framework asks the connector to save its offsets (see connector worker config property 'offset.flush.interval.ms').", Short.valueOf(shortValue));
            }
            this.client.sessionState().set(shortValue, partitionState);
        }
    }

    private void initFailoverLogs() {
        this.client.failoverLogs(this.partitions).toBlocking().forEach(byteBuf -> {
            short vbucket = DcpFailoverLogResponse.vbucket(byteBuf);
            PartitionState partitionState = this.client.sessionState().get(vbucket);
            partitionState.setFailoverLog(DcpFailoverLogResponse.entries(byteBuf));
            this.client.sessionState().set(vbucket, partitionState);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getVBucketUuid(int i) {
        return this.client.sessionState().get(i).getLastUuid();
    }

    public void shutdown() {
        this.client.disconnect().await();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void ackAndRelease(ChannelFlowController channelFlowController, ByteBuf byteBuf) throws IllegalReferenceCountException {
        ack(channelFlowController, byteBuf);
        byteBuf.release();
    }

    private static void ack(ChannelFlowController channelFlowController, ByteBuf byteBuf) throws IllegalReferenceCountException {
        try {
            channelFlowController.ack(byteBuf);
        } catch (IllegalReferenceCountException e) {
            throw e;
        } catch (Exception e2) {
            LOGGER.warn("Flow control ack failed (channel already closed?)", e2);
        }
    }
}
