package com.mongodb.spark.sql.connector.read;

import com.mongodb.MongoException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.spark.sql.connector.assertions.Assertions;
import com.mongodb.spark.sql.connector.config.ReadConfig;
import com.mongodb.spark.sql.connector.exceptions.MongoSparkException;
import com.mongodb.spark.sql.connector.schema.BsonDocumentToRowConverter;
import java.util.ArrayList;
import java.util.function.Function;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReader;
import org.apache.spark.sql.connector.read.streaming.PartitionOffset;
import org.bson.BsonDocument;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/spark/sql/connector/read/MongoContinuousPartitionReader.class */
final class MongoContinuousPartitionReader implements ContinuousPartitionReader<InternalRow> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoContinuousPartitionReader.class);
    private static final String FULL_DOCUMENT = "fullDocument";
    private final MongoInputPartition partition;
    private final BsonDocumentToRowConverter bsonDocumentToRowConverter;
    private final ReadConfig readConfig;
    private MongoContinuousInputPartitionOffset lastOffset;
    private MongoClient mongoClient;
    private MongoChangeStreamCursor<BsonDocument> changeStreamCursor;
    private boolean closed = false;
    private InternalRow currentRow = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MongoContinuousPartitionReader(MongoContinuousInputPartition mongoContinuousInputPartition, BsonDocumentToRowConverter bsonDocumentToRowConverter, ReadConfig readConfig) {
        this.partition = mongoContinuousInputPartition;
        this.bsonDocumentToRowConverter = bsonDocumentToRowConverter;
        this.readConfig = readConfig;
        this.lastOffset = mongoContinuousInputPartition.getPartitionOffset();
        LOGGER.debug("Creating partition reader for: Partition: {} with Schema: {}", mongoContinuousInputPartition, bsonDocumentToRowConverter);
    }

    public PartitionOffset getOffset() {
        Assertions.ensureState(() -> {
            return Boolean.valueOf(!this.closed);
        }, () -> {
            return "Cannot call getOffset() on a closed PartitionReader.";
        });
        LOGGER.trace("getOffset called, returning: {}", this.lastOffset);
        return this.lastOffset;
    }

    public boolean next() {
        Assertions.ensureState(() -> {
            return Boolean.valueOf(!this.closed);
        }, () -> {
            return "Cannot call next() on a closed PartitionReader.";
        });
        boolean z = false;
        while (!z) {
            z = tryNext();
        }
        return true;
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public InternalRow m16get() {
        Assertions.ensureState(() -> {
            return Boolean.valueOf(!this.closed);
        }, () -> {
            return "Cannot call get() on a closed PartitionReader.";
        });
        Assertions.ensureState(() -> {
            return Boolean.valueOf(this.currentRow != null);
        }, () -> {
            return "Current row is null, this should not happen if `next()` returns true.";
        });
        LOGGER.trace("Get called: {}", this.currentRow);
        return this.currentRow;
    }

    public void close() {
        LOGGER.info("Closing the stream partition reader.");
        if (this.closed) {
            return;
        }
        this.closed = true;
        releaseCursorAndClient();
    }

    private boolean tryNext() {
        return ((Boolean) withCursor(mongoChangeStreamCursor -> {
            try {
            } catch (MongoException e) {
                LOGGER.info("Trying to get more data from the change stream failed, releasing cursor.", e);
            }
            if (!mongoChangeStreamCursor.hasNext()) {
                setLastOffset(mongoChangeStreamCursor.getResumeToken());
                releaseCursorAndClient();
                this.currentRow = null;
                return false;
            }
            BsonDocument bsonDocument = (BsonDocument) mongoChangeStreamCursor.next();
            if (bsonDocument.containsKey("_id") && bsonDocument.isDocument("_id")) {
                setLastOffset(bsonDocument.getDocument("_id"));
            }
            if (this.readConfig.streamPublishFullDocumentOnly()) {
                bsonDocument = bsonDocument.getDocument(FULL_DOCUMENT, new BsonDocument());
            }
            this.currentRow = this.bsonDocumentToRowConverter.toInternalRow(bsonDocument);
            return true;
        })).booleanValue();
    }

    private void setLastOffset(@Nullable BsonDocument bsonDocument) {
        if (bsonDocument != null) {
            this.lastOffset = new MongoContinuousInputPartitionOffset(new ResumeTokenBasedOffset(bsonDocument));
        }
    }

    private MongoChangeStreamCursor<BsonDocument> getCursor() {
        if (this.mongoClient == null) {
            this.mongoClient = this.readConfig.getMongoClient();
        }
        if (this.changeStreamCursor == null) {
            ArrayList arrayList = new ArrayList();
            if (this.readConfig.streamPublishFullDocumentOnly()) {
                arrayList.add(Aggregates.match(Filters.exists(FULL_DOCUMENT)).toBsonDocument());
            }
            arrayList.addAll(this.partition.getPipeline());
            try {
                this.changeStreamCursor = this.lastOffset.applyToChangeStreamIterable(this.mongoClient.getDatabase(this.readConfig.getDatabaseName()).getCollection(this.readConfig.getCollectionName()).watch(arrayList).fullDocument(this.readConfig.getStreamFullDocument()).comment(this.readConfig.getComment())).withDocumentClass(BsonDocument.class).cursor();
                LOGGER.debug("Opened change stream cursor for partition: {}", this.partition);
            } catch (RuntimeException e) {
                throw new MongoSparkException("Could not create the change stream cursor.", e);
            }
        }
        return this.changeStreamCursor;
    }

    private <T> T withCursor(Function<MongoChangeStreamCursor<BsonDocument>, T> function) {
        try {
            return function.apply(getCursor());
        } catch (MongoException e) {
            releaseCursor();
            throw new MongoSparkException("Change stream cursor failure.", e);
        } catch (MongoInterruptedException e2) {
            releaseCursor();
            throw new MongoSparkException("Change stream cursor interrupted.");
        }
    }

    private void releaseCursorAndClient() {
        try {
            releaseCursor();
        } catch (RuntimeException e) {
        }
        try {
            releaseClient();
        } catch (RuntimeException e2) {
        }
    }

    private void releaseCursor() {
        if (this.changeStreamCursor != null) {
            LOGGER.debug("Releasing change stream cursor for partition: {}", this.partition);
            try {
                this.changeStreamCursor.close();
            } finally {
                this.changeStreamCursor = null;
            }
        }
    }

    private void releaseClient() {
        if (this.changeStreamCursor != null) {
            LOGGER.debug("Releasing mongoClient for partition: {}", this.partition);
            try {
                this.mongoClient.close();
            } finally {
                this.mongoClient = null;
            }
        }
    }
}
