package com.mongodb.spark.sql.connector.read;

import com.mongodb.spark.sql.connector.assertions.Assertions;
import com.mongodb.spark.sql.connector.config.ReadConfig;
import com.mongodb.spark.sql.connector.schema.BsonDocumentToRowConverter;
import com.mongodb.spark.sql.connector.schema.InferSchema;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.ContinuousStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.PartitionOffset;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/spark/sql/connector/read/MongoContinuousStream.class */
final class MongoContinuousStream implements ContinuousStream {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoContinuousStream.class);
    private final StructType schema;
    private final MongoOffsetStore mongoOffsetStore;
    private final ReadConfig readConfig;
    private final BsonDocumentToRowConverter bsonDocumentToRowConverter;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MongoContinuousStream(StructType structType, String str, ReadConfig readConfig) {
        Assertions.validateConfig(structType, structType2 -> {
            return !structType2.isEmpty() && (!InferSchema.isInferred(structType2) || readConfig.streamPublishFullDocumentOnly());
        }, () -> {
            return "Mongo Continuous streams require a schema to be explicitly defined, unless using publish full document only.";
        });
        this.schema = structType;
        this.mongoOffsetStore = new MongoOffsetStore(str, MongoOffset.getInitialOffset(readConfig));
        this.readConfig = readConfig;
        this.bsonDocumentToRowConverter = new BsonDocumentToRowConverter(structType, readConfig);
    }

    public InputPartition[] planInputPartitions(Offset offset) {
        return new InputPartition[]{new MongoContinuousInputPartition(0, MongoInputPartitionHelper.generatePipeline(this.schema, this.readConfig), new MongoContinuousInputPartitionOffset((MongoOffset) offset))};
    }

    public ContinuousPartitionReaderFactory createContinuousReaderFactory() {
        return new MongoContinuousPartitionReaderFactory(this.bsonDocumentToRowConverter, this.readConfig);
    }

    public Offset mergeOffsets(PartitionOffset[] partitionOffsetArr) {
        Assertions.ensureState(() -> {
            return Boolean.valueOf(partitionOffsetArr.length == 1);
        }, () -> {
            return "Multiple offsets found when there should only be one.";
        });
        Assertions.ensureState(() -> {
            return Boolean.valueOf(partitionOffsetArr[0] instanceof MongoContinuousInputPartitionOffset);
        }, () -> {
            return String.format("Unexpected partition offset type. Expected MongoContinuousInputPartitionOffset` found `%s`", partitionOffsetArr[0].getClass());
        });
        return ((MongoContinuousInputPartitionOffset) partitionOffsetArr[0]).getOffset();
    }

    public Offset initialOffset() {
        return this.mongoOffsetStore.initialOffset();
    }

    public Offset deserializeOffset(String str) {
        return MongoOffset.fromJson(str);
    }

    public void commit(Offset offset) {
        LOGGER.info("ContinuousStream commit: {}", offset);
        this.mongoOffsetStore.updateOffset((MongoOffset) offset);
    }

    public void stop() {
        LOGGER.info("ContinuousStream stopped.");
    }
}
