package org.apache.camel.component.mongodb3;

import com.mongodb.CursorType;
import com.mongodb.MongoCursorNotFoundException;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
import org.apache.camel.Exchange;
import org.bson.Document;

/* loaded from: input_file:org/apache/camel/component/mongodb3/MongoDbTailingThread.class */
class MongoDbTailingThread extends MongoAbstractConsumerThread {
    private static final String CAPPED_KEY = "capped";
    private MongoDbTailTrackingManager tailTracking;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MongoDbTailingThread(MongoDbEndpoint mongoDbEndpoint, MongoDbTailableCursorConsumer mongoDbTailableCursorConsumer, MongoDbTailTrackingManager mongoDbTailTrackingManager) {
        super(mongoDbEndpoint, mongoDbTailableCursorConsumer);
        this.tailTracking = mongoDbTailTrackingManager;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.component.mongodb3.MongoAbstractConsumerThread
    public void init() {
        if (this.log.isInfoEnabled()) {
            this.log.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}", String.format("db: %s, col: %s", this.endpoint.getMongoDatabase(), this.endpoint.getCollection()));
        }
        if (!isCollectionCapped().booleanValue()) {
            throw new CamelMongoDbException(String.format("Tailable cursors are only compatible with capped collections, and collection %s is not capped", this.endpoint.getCollection()));
        }
        try {
            this.tailTracking.recoverFromStore();
            this.cursor = initializeCursor();
            if (this.cursor == null) {
                throw new CamelMongoDbException("Tailable cursor was not initialized, or cursor returned is dead on arrival");
            }
        } catch (Exception e) {
            throw new CamelMongoDbException("Exception occurred while initializing tailable cursor", e);
        }
    }

    private Boolean isCollectionCapped() {
        return this.endpoint.getMongoDatabase().runCommand(createCollStatsCommand()).getBoolean(CAPPED_KEY);
    }

    private Document createCollStatsCommand() {
        return new Document("collStats", this.endpoint.getCollection());
    }

    @Override // org.apache.camel.component.mongodb3.MongoAbstractConsumerThread
    protected MongoCursor<Document> initializeCursor() {
        Object obj = this.tailTracking.lastVal;
        return obj == null ? this.dbCol.find().cursorType(CursorType.TailableAwait).iterator() : this.dbCol.find(Filters.gt(this.tailTracking.getIncreasingFieldName(), obj)).cursorType(CursorType.TailableAwait).iterator();
    }

    @Override // org.apache.camel.component.mongodb3.MongoAbstractConsumerThread
    protected void regeneratingCursor() {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Regenerating cursor with lastVal: {}, waiting {} ms first", this.tailTracking.lastVal, Long.valueOf(this.cursorRegenerationDelay));
        }
    }

    @Override // org.apache.camel.component.mongodb3.MongoAbstractConsumerThread
    protected void doRun() {
        while (this.cursor.hasNext() && this.keepRunning) {
            try {
                try {
                    Document document = (Document) this.cursor.next();
                    Exchange createMongoDbExchange = this.endpoint.createMongoDbExchange(document);
                    try {
                        if (this.log.isTraceEnabled()) {
                            this.log.trace("Sending exchange: {}, ObjectId: {}", createMongoDbExchange, document.get(MongoDbConstants.MONGO_ID));
                        }
                        this.consumer.getProcessor().process(createMongoDbExchange);
                    } catch (Exception e) {
                    }
                    this.tailTracking.setLastVal(document);
                } catch (Throwable th) {
                    this.tailTracking.persistToStore();
                    throw th;
                }
            } catch (MongoCursorNotFoundException e2) {
                if (this.keepRunning) {
                    this.log.debug("Cursor not found exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.", e2);
                }
                this.tailTracking.persistToStore();
                return;
            } catch (IllegalStateException e3) {
                this.log.info("Cursor was closed, likely the consumer was stopped and closed the cursor on purpose.", e3);
                if (this.cursor != null) {
                    this.cursor.close();
                }
                this.keepRunning = false;
                this.tailTracking.persistToStore();
                return;
            }
        }
        this.tailTracking.persistToStore();
    }
}
