package com.couchbase.connector.flink;

import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.highlevel.DatabaseChangeListener;
import com.couchbase.client.dcp.highlevel.Deletion;
import com.couchbase.client.dcp.highlevel.DocumentChange;
import com.couchbase.client.dcp.highlevel.FlowControlMode;
import com.couchbase.client.dcp.highlevel.Mutation;
import com.couchbase.client.dcp.highlevel.StreamFailure;
import com.couchbase.client.dcp.highlevel.StreamOffset;
import com.couchbase.connector.flink.CouchbaseDocumentChange;
import com.couchbase.connector.flink.internal.dcp.DcpVbucketAndOffset;
import com.couchbase.connector.flink.internal.dcp.PartitionHelper;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/connector/flink/CouchbaseSource.class */
public class CouchbaseSource extends RichParallelSourceFunction<CouchbaseDocumentChange> implements CheckpointedFunction {
    private static final Logger log = LoggerFactory.getLogger(CouchbaseSource.class);
    private static final Exception poisonPill = new Exception();
    private volatile boolean running;
    private volatile String taskDescription;
    private transient ListState<DcpVbucketAndOffset> state;
    private transient Counter mutations;
    private transient Counter deletions;
    private transient Counter expirations;
    private Client client;
    private transient Object checkpointLock;
    private static final int MAX_VBUCKETS = 1024;
    private final String seedNodes;
    private final String username;
    private final String password;
    private final String bucketName;
    private final BlockingQueue<Throwable> fatalErrorQueue = new LinkedBlockingQueue();
    private final StreamOffset[] vbucketToStreamOffset = new StreamOffset[MAX_VBUCKETS];

    public CouchbaseSource(String str, String str2, String str3, String str4) {
        this.seedNodes = (String) Objects.requireNonNull(str);
        this.username = (String) Objects.requireNonNull(str2);
        this.password = (String) Objects.requireNonNull(str3);
        this.bucketName = (String) Objects.requireNonNull(str4);
    }

    public void open(Configuration configuration) throws Exception {
        MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
        this.mutations = metricGroup.counter("dcpMutations");
        this.deletions = metricGroup.counter("dcpDeletions");
        this.expirations = metricGroup.counter("dcpExpirations");
        this.client = Client.builder().userAgent("flink-connector", "0.1.0", new String[]{getRuntimeContext().getTaskNameWithSubtasks()}).seedNodes(new String[]{this.seedNodes}).credentials(this.username, this.password).bucket(this.bucketName).flowControl(134217728).build();
    }

    public void run(final SourceFunction.SourceContext<CouchbaseDocumentChange> sourceContext) throws Exception {
        Throwable take;
        this.running = true;
        this.taskDescription = getRuntimeContext().getTaskNameWithSubtasks();
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.client.listener(new DatabaseChangeListener() { // from class: com.couchbase.connector.flink.CouchbaseSource.1
            public void onFailure(StreamFailure streamFailure) {
                CouchbaseSource.this.fatalErrorQueue.add(streamFailure.getCause());
            }

            public void onMutation(Mutation mutation) {
                CouchbaseSource.this.mutations.inc();
                onDocumentChange(mutation);
            }

            public void onDeletion(Deletion deletion) {
                (deletion.isDueToExpiration() ? CouchbaseSource.this.expirations : CouchbaseSource.this.deletions).inc();
                onDocumentChange(deletion);
            }

            private void onDocumentChange(DocumentChange documentChange) {
                CouchbaseDocumentChange couchbaseDocumentChange = new CouchbaseDocumentChange(documentChange.isMutation() ? CouchbaseDocumentChange.Type.MUTATION : CouchbaseDocumentChange.Type.DELETION, documentChange.getKey(), documentChange.getVbucket(), documentChange.getContent());
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(couchbaseDocumentChange);
                    CouchbaseSource.this.vbucketToStreamOffset[documentChange.getVbucket()] = documentChange.getOffset();
                }
            }
        }, FlowControlMode.AUTOMATIC);
        this.client.connect().await();
        List<Integer> assignedPartitions = PartitionHelper.getAssignedPartitions(this.client.numPartitions(), getRuntimeContext());
        log.info("{} handling partitions: {}", this.taskDescription, assignedPartitions);
        Map map = (Map) assignedPartitions.stream().collect(Collectors.toMap(num -> {
            return num;
        }, num2 -> {
            return (StreamOffset) Optional.ofNullable(this.vbucketToStreamOffset[num2.intValue()]).orElse(StreamOffset.ZERO);
        }));
        if (map.isEmpty()) {
            log.warn("No work for {}", this.taskDescription);
            return;
        }
        this.client.resumeStreaming(map).await();
        try {
            if (this.running && (take = this.fatalErrorQueue.take()) != poisonPill) {
                if (take instanceof Exception) {
                    throw ((Exception) take);
                }
                if (!(take instanceof Error)) {
                    throw new RuntimeException(take);
                }
                throw ((Error) take);
            }
        } finally {
            disconnectDcpClient();
        }
    }

    public void cancel() {
        this.running = false;
        this.fatalErrorQueue.offer(poisonPill);
    }

    private void disconnectDcpClient() {
        try {
            log.info("Disconnecting Couchbase DCP connection...");
            this.client.disconnect().await(5L, TimeUnit.SECONDS);
            log.info("DCP disconnection complete.");
        } catch (Exception e) {
            log.warn("DCP client disconnection failed or timed out.", e);
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (!this.running) {
            log.debug("snapshotState() called on closed source");
            return;
        }
        if (!Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError("Thread didn't hold checkpoint lock!");
        }
        this.state.clear();
        for (int i = 0; i < this.vbucketToStreamOffset.length; i++) {
            StreamOffset streamOffset = this.vbucketToStreamOffset[i];
            if (streamOffset != null) {
                this.state.add(new DcpVbucketAndOffset(i, streamOffset));
                log.debug("snapshot state for vbucket {}: {}", Integer.valueOf(i), streamOffset);
            }
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.state = functionInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor("couchbase-stream-offsets", TypeInformation.of(DcpVbucketAndOffset.class)));
        if (!functionInitializationContext.isRestored()) {
            log.info("No restore state for CouchbaseSource.");
            return;
        }
        if (this.checkpointLock == null) {
            throw new AssertionError("oops, checkpoint lock not initialized yet");
        }
        synchronized (this.checkpointLock) {
            ((Iterable) this.state.get()).forEach(dcpVbucketAndOffset -> {
                this.vbucketToStreamOffset[dcpVbucketAndOffset.getVbucket()] = dcpVbucketAndOffset.getOffset();
            });
        }
        TreeMap treeMap = new TreeMap();
        for (int i = 0; i < this.vbucketToStreamOffset.length; i++) {
            if (this.vbucketToStreamOffset[i] != null) {
                treeMap.put(Integer.valueOf(i), this.vbucketToStreamOffset[i]);
            }
            log.debug("Restore state for CouchbaseSource: {}", treeMap);
        }
    }
}
