package com.couchbase.connect.kafka;

import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.StreamTo;
import com.couchbase.client.dcp.config.CompressionMode;
import com.couchbase.client.dcp.config.DcpControl;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.connect.kafka.dcp.Event;
import com.couchbase.connect.kafka.dcp.Message;
import com.couchbase.connect.kafka.dcp.Snapshot;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.functions.Action1;

/* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseReader.class */
public class CouchbaseReader extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseReader.class);
    private final Client client;
    private final Short[] partitions;
    private final Map<Short, Long> partitionToSavedSeqno;
    private final StreamFrom streamFrom;
    private final Map<Short, Snapshot> snapshots;
    private final BlockingQueue<Throwable> errorQueue;

    public CouchbaseReader(List<String> list, String str, String str2, String str3, long j, final BlockingQueue<Event> blockingQueue, BlockingQueue<Throwable> blockingQueue2, Short[] shArr, Map<Short, Long> map, StreamFrom streamFrom, final boolean z, boolean z2, String str4, String str5, CompressionMode compressionMode) {
        this.snapshots = new ConcurrentHashMap(shArr.length);
        this.partitions = shArr;
        this.partitionToSavedSeqno = map;
        this.streamFrom = streamFrom;
        this.errorQueue = blockingQueue2;
        this.client = Client.configure().connectTimeout(j).hostnames(list).bucket(str).username(str2).password(str3).controlParam(DcpControl.Names.CONNECTION_BUFFER_SIZE, 20480).controlParam(DcpControl.Names.ENABLE_NOOP, "true").compression(compressionMode).bufferAckWatermark(60).sslEnabled(z2).sslKeystoreFile(str4).sslKeystorePassword(str5).build();
        this.client.controlEventHandler(new ControlEventHandler() { // from class: com.couchbase.connect.kafka.CouchbaseReader.1
            @Override // com.couchbase.client.dcp.ControlEventHandler
            public void onEvent(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
                if (z && DcpSnapshotMarkerRequest.is(byteBuf)) {
                    Snapshot snapshot = new Snapshot(DcpSnapshotMarkerRequest.partition(byteBuf), DcpSnapshotMarkerRequest.startSeqno(byteBuf), DcpSnapshotMarkerRequest.endSeqno(byteBuf));
                    Snapshot snapshot2 = (Snapshot) CouchbaseReader.this.snapshots.put(Short.valueOf(snapshot.partition()), snapshot);
                    if (snapshot2 != null) {
                        CouchbaseReader.LOGGER.warn("Incomplete snapshot detected: {}", snapshot2);
                    }
                }
                channelFlowController.ack(byteBuf);
                byteBuf.release();
            }
        });
        this.client.dataEventHandler(new DataEventHandler() { // from class: com.couchbase.connect.kafka.CouchbaseReader.2
            @Override // com.couchbase.client.dcp.DataEventHandler
            public void onEvent(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
                if (!z) {
                    try {
                        blockingQueue.put(new Message(byteBuf, channelFlowController));
                        return;
                    } catch (InterruptedException e) {
                        CouchbaseReader.LOGGER.error("Unable to put DCP request into the queue", e);
                        return;
                    }
                }
                short partition = DcpMutationMessage.partition(byteBuf);
                long bySeqno = DcpMutationMessage.bySeqno(byteBuf);
                Snapshot snapshot = (Snapshot) CouchbaseReader.this.snapshots.get(Short.valueOf(partition));
                if (snapshot == null) {
                    CouchbaseReader.LOGGER.warn("Event with seqno {} for partition {} ignored, because missing snapshot", Long.valueOf(bySeqno), Short.valueOf(partition));
                } else if (bySeqno < snapshot.startSeqno()) {
                    CouchbaseReader.LOGGER.warn("Event with seqno {} for partition {} ignored, because current snapshot has higher seqno {}", new Object[]{Long.valueOf(bySeqno), Short.valueOf(partition), Long.valueOf(snapshot.startSeqno())});
                } else {
                    byteBuf.retain();
                    if (snapshot.add(byteBuf)) {
                        Snapshot snapshot2 = (Snapshot) CouchbaseReader.this.snapshots.remove(Short.valueOf(partition));
                        if (snapshot != snapshot2) {
                            CouchbaseReader.LOGGER.warn("Conflict of snapshots detected, expected to remove {}, but removed {}", snapshot, snapshot2);
                        }
                        blockingQueue.add(snapshot);
                    }
                }
                channelFlowController.ack(byteBuf);
                byteBuf.release();
            }
        });
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            this.client.connect().await();
            this.client.initializeState(this.streamFrom.withoutSavedOffset().asDcpStreamFrom(), StreamTo.INFINITY).await();
            if (this.streamFrom.isSavedOffset()) {
                restoreSavedOffsets();
                initFailoverLogs();
            }
            this.client.startStreaming(this.partitions).await();
        } catch (Throwable th) {
            this.errorQueue.add(th);
        }
    }

    private void restoreSavedOffsets() {
        LOGGER.info("Resuming from saved offsets for {} of {} partitions", Integer.valueOf(this.partitionToSavedSeqno.size()), Integer.valueOf(this.partitions.length));
        for (Map.Entry<Short, Long> entry : this.partitionToSavedSeqno.entrySet()) {
            short shortValue = entry.getKey().shortValue();
            long longValue = entry.getValue().longValue();
            PartitionState partitionState = this.client.sessionState().get(shortValue);
            partitionState.setStartSeqno(longValue);
            partitionState.setSnapshotStartSeqno(longValue);
            partitionState.setSnapshotEndSeqno(longValue);
            this.client.sessionState().set(shortValue, partitionState);
        }
    }

    private void initFailoverLogs() {
        this.client.failoverLogs(this.partitions).toBlocking().forEach(new Action1<ByteBuf>() { // from class: com.couchbase.connect.kafka.CouchbaseReader.3
            @Override // rx.functions.Action1
            public void call(ByteBuf byteBuf) {
                short vbucket = DcpFailoverLogResponse.vbucket(byteBuf);
                int numLogEntries = DcpFailoverLogResponse.numLogEntries(byteBuf);
                PartitionState partitionState = CouchbaseReader.this.client.sessionState().get(vbucket);
                partitionState.getFailoverLog().clear();
                for (int i = 0; i < numLogEntries; i++) {
                    partitionState.addToFailoverLog(DcpFailoverLogResponse.seqnoEntry(byteBuf, i), DcpFailoverLogResponse.vbuuidEntry(byteBuf, i));
                }
                CouchbaseReader.this.client.sessionState().set(vbucket, partitionState);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getVBucketUuid(int i) {
        return this.client.sessionState().get(i).getLastUuid();
    }

    public void shutdown() {
        this.client.disconnect().await();
    }
}
