package com.couchbase.kafka;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.dcp.BucketStreamAggregator;
import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
import com.couchbase.client.core.dcp.BucketStreamState;
import com.couchbase.client.core.dcp.BucketStreamStateUpdatedEvent;
import com.couchbase.client.core.message.CouchbaseMessage;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.cluster.OpenBucketRequest;
import com.couchbase.client.core.message.cluster.SeedNodesRequest;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
import com.couchbase.client.deps.com.lmax.disruptor.EventTranslatorOneArg;
import com.couchbase.client.deps.com.lmax.disruptor.RingBuffer;
import com.couchbase.kafka.state.RunMode;
import com.couchbase.kafka.state.StateSerializer;
import java.util.List;
import java.util.concurrent.TimeUnit;
import rx.functions.Action1;
import rx.functions.Func1;

/* loaded from: input_file:com/couchbase/kafka/CouchbaseReader.class */
public class CouchbaseReader {
    private static final EventTranslatorOneArg<DCPEvent, CouchbaseMessage> TRANSLATOR = new EventTranslatorOneArg<DCPEvent, CouchbaseMessage>() { // from class: com.couchbase.kafka.CouchbaseReader.1
        public void translateTo(DCPEvent dCPEvent, long j, CouchbaseMessage couchbaseMessage) {
            dCPEvent.setMessage(couchbaseMessage);
        }
    };
    private final ClusterFacade core;
    private final RingBuffer<DCPEvent> dcpRingBuffer;
    private final List<String> nodes;
    private final String bucket;
    private final String streamName;
    private final String password;
    private final BucketStreamAggregator streamAggregator;
    private final StateSerializer stateSerializer;
    private int numberOfPartitions;

    public CouchbaseReader(ClusterFacade clusterFacade, CouchbaseKafkaEnvironment couchbaseKafkaEnvironment, RingBuffer<DCPEvent> ringBuffer, StateSerializer stateSerializer) {
        this(couchbaseKafkaEnvironment.couchbaseNodes(), couchbaseKafkaEnvironment.couchbaseBucket(), couchbaseKafkaEnvironment.couchbasePassword(), clusterFacade, couchbaseKafkaEnvironment, ringBuffer, stateSerializer);
    }

    public CouchbaseReader(List<String> list, String str, String str2, ClusterFacade clusterFacade, CouchbaseKafkaEnvironment couchbaseKafkaEnvironment, RingBuffer<DCPEvent> ringBuffer, StateSerializer stateSerializer) {
        this.core = clusterFacade;
        this.dcpRingBuffer = ringBuffer;
        this.nodes = list;
        this.bucket = str;
        this.password = str2;
        this.streamAggregator = new BucketStreamAggregator(clusterFacade, this.bucket);
        this.stateSerializer = stateSerializer;
        this.streamName = "CouchbaseKafka(" + hashCode() + ")";
    }

    public void connect() {
        connect(2L, TimeUnit.SECONDS);
    }

    public void connect(long j, TimeUnit timeUnit) {
        this.core.send(new SeedNodesRequest(this.nodes)).timeout(j, timeUnit).toBlocking().single();
        this.core.send(new OpenBucketRequest(this.bucket, this.password)).timeout(j, timeUnit).toBlocking().single();
        this.numberOfPartitions = ((Integer) this.core.send(new GetClusterConfigRequest()).map(new Func1<GetClusterConfigResponse, Integer>() { // from class: com.couchbase.kafka.CouchbaseReader.2
            public Integer call(GetClusterConfigResponse getClusterConfigResponse) {
                return Integer.valueOf(getClusterConfigResponse.config().bucketConfig(CouchbaseReader.this.bucket).numberOfPartitions());
            }
        }).timeout(j, timeUnit).toBlocking().single()).intValue();
    }

    public void run() {
        run(RunMode.LOAD_AND_RESUME);
    }

    public void run(RunMode runMode) {
        BucketStreamAggregatorState bucketStreamAggregatorState = new BucketStreamAggregatorState(this.streamName);
        for (int i = 0; i < this.numberOfPartitions; i++) {
            bucketStreamAggregatorState.put(new BucketStreamState((short) i, 0L, 0L, -1L, 0L, -1L));
        }
        run(bucketStreamAggregatorState, runMode);
    }

    public void run(final BucketStreamAggregatorState bucketStreamAggregatorState, RunMode runMode) {
        if (runMode == RunMode.LOAD_AND_RESUME) {
            this.stateSerializer.load(bucketStreamAggregatorState);
        }
        bucketStreamAggregatorState.updates().subscribe(new Action1<BucketStreamStateUpdatedEvent>() { // from class: com.couchbase.kafka.CouchbaseReader.3
            public void call(BucketStreamStateUpdatedEvent bucketStreamStateUpdatedEvent) {
                if (bucketStreamStateUpdatedEvent.partialUpdate()) {
                    CouchbaseReader.this.stateSerializer.dump(bucketStreamStateUpdatedEvent.aggregatorState());
                } else {
                    CouchbaseReader.this.stateSerializer.dump(bucketStreamStateUpdatedEvent.aggregatorState(), bucketStreamStateUpdatedEvent.partitionState().partition());
                }
            }
        });
        this.streamAggregator.feed(bucketStreamAggregatorState).toBlocking().forEach(new Action1<DCPRequest>() { // from class: com.couchbase.kafka.CouchbaseReader.4
            public void call(DCPRequest dCPRequest) {
                if (!(dCPRequest instanceof SnapshotMarkerMessage)) {
                    CouchbaseReader.this.dcpRingBuffer.tryPublishEvent(CouchbaseReader.TRANSLATOR, dCPRequest);
                    return;
                }
                SnapshotMarkerMessage snapshotMarkerMessage = (SnapshotMarkerMessage) dCPRequest;
                BucketStreamState bucketStreamState = bucketStreamAggregatorState.get(snapshotMarkerMessage.partition());
                bucketStreamAggregatorState.put(new BucketStreamState(snapshotMarkerMessage.partition(), bucketStreamState.vbucketUUID(), snapshotMarkerMessage.endSequenceNumber(), bucketStreamState.endSequenceNumber(), snapshotMarkerMessage.endSequenceNumber(), bucketStreamState.snapshotEndSequenceNumber()));
            }
        });
    }
}
