package me.tfeng.toolbox.mongodb;

import com.mongodb.CursorType;
import com.mongodb.async.client.MongoClient;
import com.mongodb.async.client.MongoCollection;
import me.tfeng.toolbox.spring.Startable;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:me/tfeng/toolbox/mongodb/OplogListener.class */
public class OplogListener implements Startable {
    public static final String COLLECTION_NAME = "oplog.rs";
    public static final String DB_NAME = "local";
    private static final Logger LOG = LoggerFactory.getLogger(OplogListener.class);
    private MongoCollection<Document> collection;
    private OplogItemHandler handler;
    private MongoClient mongoClient;
    private String namespace;
    private BsonTimestamp startTimestamp;

    public void onStart() throws Throwable {
        if (this.mongoClient == null || this.handler == null) {
            throw new Exception("mongoClient and handler must both be provided");
        }
        LOG.info("Connecting to local.oplog.rs in MongoDB");
        this.collection = this.mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME);
        this.collection.find(getQuery()).sort(getSort()).cursorType(getCursorType()).forEach(this::process, (r4, th) -> {
            if (th != null) {
                LOG.info("Oplog listener existed with exception", th);
            }
        });
    }

    public void onStop() throws Throwable {
    }

    public void setHandler(OplogItemHandler oplogItemHandler) {
        this.handler = oplogItemHandler;
    }

    public void setMongoClient(MongoClient mongoClient) {
        this.mongoClient = mongoClient;
    }

    public void setNamespace(String str) {
        this.namespace = str;
    }

    public void setStartTimestamp(BsonTimestamp bsonTimestamp) {
        this.startTimestamp = bsonTimestamp;
    }

    protected CursorType getCursorType() {
        return CursorType.TailableAwait;
    }

    protected Document getQuery() {
        Document document = new Document();
        if (this.startTimestamp != null) {
            document.put("ts", new Document("$gt", this.startTimestamp));
        }
        if (this.namespace != null) {
            document.put("ns", this.namespace);
        }
        return document;
    }

    protected Document getSort() {
        return new Document("$natural", 1);
    }

    protected void process(Document document) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received oplog item " + document);
        }
        this.handler.handle(RecordConverter.toRecord(OplogItem.class, document));
    }
}
