package com.mongodb.spark.sql.connector.read;

import com.mongodb.spark.sql.connector.assertions.Assertions;
import com.mongodb.spark.sql.connector.config.ReadConfig;
import com.mongodb.spark.sql.connector.schema.BsonDocumentToRowConverter;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.ContinuousStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.PartitionOffset;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/spark/sql/connector/read/MongoContinuousStream.class */
public class MongoContinuousStream implements ContinuousStream {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoContinuousStream.class);
    private final BsonDocumentToRowConverter bsonDocumentToRowConverter;
    private final ReadConfig readConfig;

    public MongoContinuousStream(StructType structType, ReadConfig readConfig) {
        Assertions.validateConfig(structType, structType2 -> {
            return !structType2.isEmpty();
        }, () -> {
            return "Mongo Continuous streams require a schema to be defined";
        });
        this.bsonDocumentToRowConverter = new BsonDocumentToRowConverter(structType);
        this.readConfig = readConfig;
    }

    public InputPartition[] planInputPartitions(Offset offset) {
        return new InputPartition[]{new MongoInputPartition(0, this.readConfig.getAggregationPipeline(), new ResumeTokenPartitionOffset(((ResumeTokenOffset) offset).getResumeToken()))};
    }

    public ContinuousPartitionReaderFactory createContinuousReaderFactory() {
        return new MongoStreamPartitionReaderFactory(this.bsonDocumentToRowConverter, this.readConfig);
    }

    public Offset mergeOffsets(PartitionOffset[] partitionOffsetArr) {
        Assertions.ensureState(() -> {
            return Boolean.valueOf(partitionOffsetArr.length == 1);
        }, () -> {
            return "Multiple offsets found when there should only be one.";
        });
        Assertions.ensureState(() -> {
            return Boolean.valueOf(partitionOffsetArr[0] instanceof ResumeTokenPartitionOffset);
        }, () -> {
            return String.format("Unexpected partition offset type. Expected ResumeTokenPartitionOffset` found `%s`", partitionOffsetArr[0].getClass());
        });
        return new ResumeTokenOffset(((ResumeTokenPartitionOffset) partitionOffsetArr[0]).getResumeToken());
    }

    public Offset initialOffset() {
        return ResumeTokenOffset.INITIAL_RESUME_TOKEN_OFFSET;
    }

    public Offset deserializeOffset(String str) {
        return ResumeTokenOffset.parse(str);
    }

    public void commit(Offset offset) {
        LOGGER.info("ContinuousStream commit: {}", offset);
    }

    public void stop() {
        LOGGER.info("ContinuousStream stopped.");
    }
}
