package org.apache.camel.component.mongodb;

import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoCursorNotFoundException;
import com.mongodb.QueryOperators;
import org.apache.camel.Exchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/camel-mongodb-2.14.1.jar:org/apache/camel/component/mongodb/MongoDbTailingProcess.class */
public class MongoDbTailingProcess implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailingProcess.class);
    private static final String CAPPED_KEY = "capped";
    public volatile boolean keepRunning = true;
    public volatile boolean stopped;
    private final DBCollection dbCol;
    private final MongoDbEndpoint endpoint;
    private final MongoDbTailableCursorConsumer consumer;
    private final long cursorRegenerationDelay;
    private final boolean cursorRegenerationDelayEnabled;
    private DBCursor cursor;
    private MongoDbTailTrackingManager tailTracking;

    public MongoDbTailingProcess(MongoDbEndpoint mongoDbEndpoint, MongoDbTailableCursorConsumer mongoDbTailableCursorConsumer, MongoDbTailTrackingManager mongoDbTailTrackingManager) {
        this.endpoint = mongoDbEndpoint;
        this.consumer = mongoDbTailableCursorConsumer;
        this.dbCol = mongoDbEndpoint.getDbCollection();
        this.tailTracking = mongoDbTailTrackingManager;
        this.cursorRegenerationDelay = mongoDbEndpoint.getCursorRegenerationDelay();
        this.cursorRegenerationDelayEnabled = this.cursorRegenerationDelay != 0;
    }

    public DBCursor getCursor() {
        return this.cursor;
    }

    public void initializeProcess() throws Exception {
        if (LOG.isInfoEnabled()) {
            LOG.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}", "db: " + this.dbCol.getDB() + ", col: " + this.dbCol.getName());
        }
        if (this.dbCol.getStats().getInt(CAPPED_KEY) != 1) {
            throw new CamelMongoDbException("Tailable cursors are only compatible with capped collections, and collection " + this.dbCol.getName() + " is not capped");
        }
        try {
            this.tailTracking.recoverFromStore();
            this.cursor = initializeCursor();
            if (this.cursor == null) {
                throw new CamelMongoDbException("Tailable cursor was not initialized, or cursor returned is dead on arrival");
            }
        } catch (Exception e) {
            throw new CamelMongoDbException("Exception ocurred while initializing tailable cursor", e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.keepRunning) {
            doRun();
            if (this.keepRunning) {
                this.cursor.close();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Regenerating cursor with lastVal: {}, waiting {}ms first", this.tailTracking.lastVal, Long.valueOf(this.cursorRegenerationDelay));
                }
                if (this.cursorRegenerationDelayEnabled) {
                    try {
                        Thread.sleep(this.cursorRegenerationDelay);
                    } catch (InterruptedException e) {
                        LOG.error("Thread was interrupted", (Throwable) e);
                    }
                }
                this.cursor = initializeCursor();
            }
        }
        this.stopped = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stop() throws Exception {
        if (LOG.isInfoEnabled()) {
            LOG.info("Stopping MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + this.dbCol.getDB() + ", col: " + this.dbCol.getName());
        }
        this.keepRunning = false;
        if (this.cursor != null) {
            this.cursor.close();
        }
        do {
        } while (!this.stopped);
        if (LOG.isInfoEnabled()) {
            LOG.info("Stopped MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + this.dbCol.getDB() + ", col: " + this.dbCol.getName());
        }
    }

    private void doRun() {
        while (this.cursor.hasNext() && this.cursor.getCursorId() != 0 && this.keepRunning) {
            try {
                DBObject next = this.cursor.next();
                Exchange createMongoDbExchange = this.endpoint.createMongoDbExchange(next);
                try {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Sending exchange: {}, ObjectId: {}", createMongoDbExchange, next.get("_id"));
                    }
                    this.consumer.getProcessor().process(createMongoDbExchange);
                } catch (Exception e) {
                }
                this.tailTracking.setLastVal(next);
            } catch (MongoCursorNotFoundException e2) {
                if (this.keepRunning) {
                    LOG.debug("Cursor not found exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.", (Throwable) e2);
                }
            }
        }
        this.tailTracking.persistToStore();
    }

    private DBCursor initializeCursor() {
        DBCursor addOption;
        Object obj = this.tailTracking.lastVal;
        if (obj == null) {
            addOption = this.dbCol.find().addOption(2).addOption(32);
        } else {
            addOption = this.dbCol.find(new BasicDBObject(this.tailTracking.getIncreasingFieldName(), new BasicDBObject(QueryOperators.GT, obj))).addOption(2).addOption(32);
        }
        return addOption;
    }
}
