package com.mongodb.spark.sql.connector.write;

import com.mongodb.spark.sql.connector.config.WriteConfig;
import com.mongodb.spark.sql.connector.exceptions.DataException;
import com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter;
import java.util.Arrays;
import java.util.Objects;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/spark/sql/connector/write/MongoStreamingWrite.class */
public class MongoStreamingWrite implements StreamingWrite {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoStreamingWrite.class);
    private final LogicalWriteInfo info;
    private final WriteConfig writeConfig;
    private final RowToBsonDocumentConverter rowToBsonDocumentConverter;
    private final boolean truncate;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MongoStreamingWrite(LogicalWriteInfo logicalWriteInfo, RowToBsonDocumentConverter rowToBsonDocumentConverter, WriteConfig writeConfig, boolean z) {
        this.info = logicalWriteInfo;
        this.rowToBsonDocumentConverter = rowToBsonDocumentConverter;
        this.writeConfig = writeConfig;
        this.truncate = z;
    }

    public StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo physicalWriteInfo) {
        if (this.truncate) {
            this.writeConfig.doWithCollection((v0) -> {
                v0.drop();
            });
        }
        return new MongoDataWriterFactory(this.rowToBsonDocumentConverter, this.writeConfig);
    }

    public void commit(long j, WriterCommitMessage[] writerCommitMessageArr) {
        LOGGER.debug("Write committed for: {}, with {} task(s).", this.info.queryId(), Integer.valueOf(writerCommitMessageArr.length));
    }

    public void abort(long j, WriterCommitMessage[] writerCommitMessageArr) {
        throw new DataException(String.format("Write aborted for: %s. %s/%s tasks completed. EpochId: %s", this.info.queryId(), Long.valueOf(Arrays.stream(writerCommitMessageArr).filter((v0) -> {
            return Objects.nonNull(v0);
        }).count()), Integer.valueOf(writerCommitMessageArr.length), Long.valueOf(j)));
    }
}
