package org.apache.spark.sql.comet.execution.shuffle;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.apache.comet.CometConf$;
import org.apache.comet.shaded.guava.io.Closeables;
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.internal.config.package$;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
import org.apache.spark.shuffle.api.ShuffleMapOutputWriter;
import org.apache.spark.shuffle.api.ShufflePartitionWriter;
import org.apache.spark.shuffle.api.WritableByteChannelWrapper;
import org.apache.spark.shuffle.comet.CometShuffleChecksumSupport;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.FileSegment;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.None$;
import scala.Option;
import scala.Product2;
import scala.collection.Iterator;

/* loaded from: input_file:org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.class */
final class CometBypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> implements CometShuffleChecksumSupport {
    private static final Logger logger;
    private final int fileBufferSize;
    private final boolean transferToEnabled;
    private final int numPartitions;
    private final BlockManager blockManager;
    private final TaskMemoryManager memoryManager;
    private final TaskContext taskContext;
    private final SerializerInstance serializer;
    private final Partitioner partitioner;
    private final ShuffleWriteMetricsReporter writeMetrics;
    private final int shuffleId;
    private final long mapId;
    private final ShuffleExecutorComponents shuffleExecutorComponents;
    private final StructType schema;
    private CometDiskBlockWriter[] partitionWriters;
    private FileSegment[] partitionWriterSegments;
    private MapStatus mapStatus;
    private long[] partitionLengths;
    private final long[] partitionChecksums;
    private final boolean isAsync;
    private final int asyncThreadNum;
    private final ExecutorService threadPool;
    private boolean stopping = false;
    private final SparkConf conf;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CometBypassMergeSortShuffleWriter(BlockManager blockManager, TaskMemoryManager taskMemoryManager, TaskContext taskContext, CometBypassMergeSortShuffleHandle<K, V> cometBypassMergeSortShuffleHandle, long j, SparkConf sparkConf, ShuffleWriteMetricsReporter shuffleWriteMetricsReporter, ShuffleExecutorComponents shuffleExecutorComponents) {
        this.fileBufferSize = ((int) ((Long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE())).longValue()) * 1024;
        this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
        this.conf = sparkConf;
        this.blockManager = blockManager;
        this.memoryManager = taskMemoryManager;
        this.taskContext = taskContext;
        ShuffleDependency dependency = cometBypassMergeSortShuffleHandle.dependency();
        this.mapId = j;
        this.serializer = dependency.serializer().newInstance();
        this.shuffleId = dependency.shuffleId();
        this.partitioner = dependency.partitioner();
        this.numPartitions = this.partitioner.numPartitions();
        this.writeMetrics = shuffleWriteMetricsReporter;
        this.shuffleExecutorComponents = shuffleExecutorComponents;
        this.schema = (StructType) ((CometShuffleDependency) dependency).schema().get();
        this.partitionChecksums = createPartitionChecksums(this.numPartitions, sparkConf);
        this.isAsync = ((Boolean) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get()).booleanValue();
        this.asyncThreadNum = ((Integer) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get()).intValue();
        if (this.isAsync) {
            logger.info("Async shuffle writer enabled");
            this.threadPool = ShuffleThreadPool.getThreadPool();
        } else {
            logger.info("Async shuffle writer disabled");
            this.threadPool = null;
        }
    }

    public void write(Iterator<Product2<K, V>> iterator) throws IOException {
        if (!$assertionsDisabled && this.partitionWriters != null) {
            throw new AssertionError();
        }
        ShuffleMapOutputWriter createMapOutputWriter = this.shuffleExecutorComponents.createMapOutputWriter(this.shuffleId, this.mapId, this.numPartitions);
        try {
            if (!iterator.hasNext()) {
                this.partitionLengths = createMapOutputWriter.commitAllPartitions(ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE).getPartitionLengths();
                this.mapStatus = MapStatus$.MODULE$.apply(this.blockManager.shuffleServerId(), this.partitionLengths, this.mapId);
                return;
            }
            long nanoTime = System.nanoTime();
            this.partitionWriters = new CometDiskBlockWriter[this.numPartitions];
            this.partitionWriterSegments = new FileSegment[this.numPartitions];
            String checksumAlgorithm = getChecksumAlgorithm(this.conf);
            for (int i = 0; i < this.numPartitions; i++) {
                CometDiskBlockWriter cometDiskBlockWriter = new CometDiskBlockWriter((File) this.blockManager.diskBlockManager().createTempShuffleBlock()._2(), this.memoryManager, this.taskContext, this.serializer, this.schema, this.writeMetrics, this.conf, this.isAsync, this.asyncThreadNum, this.threadPool);
                if (this.partitionChecksums.length > 0) {
                    cometDiskBlockWriter.setChecksum(this.partitionChecksums[i]);
                    cometDiskBlockWriter.setChecksumAlgo(checksumAlgorithm);
                }
                this.partitionWriters[i] = cometDiskBlockWriter;
            }
            this.writeMetrics.incWriteTime(System.nanoTime() - nanoTime);
            long j = 0;
            while (iterator.hasNext()) {
                j++;
                Product2 product2 = (Product2) iterator.next();
                Object _1 = product2._1();
                this.partitionWriters[this.partitioner.getPartition(_1)].insertRow((UnsafeRow) product2._2(), this.partitioner.getPartition(_1));
            }
            long j2 = 0;
            for (int i2 = 0; i2 < this.numPartitions; i2++) {
                CometDiskBlockWriter cometDiskBlockWriter2 = this.partitionWriters[i2];
                this.partitionWriterSegments[i2] = cometDiskBlockWriter2.close();
                j2 += cometDiskBlockWriter2.getOutputRecords();
            }
            if (j != j2) {
                RuntimeException runtimeException = new RuntimeException("outputRows(" + j + ") != spillRecords(" + runtimeException + "). Please file a bug report.");
                throw runtimeException;
            }
            this.partitionLengths = writePartitionedData(createMapOutputWriter);
            this.mapStatus = MapStatus$.MODULE$.apply(this.blockManager.shuffleServerId(), this.partitionLengths, this.mapId);
        } catch (Exception e) {
            try {
                createMapOutputWriter.abort(e);
            } catch (Exception e2) {
                logger.error("Failed to abort the writer after failing to write map output.", e2);
                e.addSuppressed(e2);
            }
            throw e;
        }
    }

    public long[] getPartitionLengths() {
        return this.partitionLengths;
    }

    private long[] writePartitionedData(ShuffleMapOutputWriter shuffleMapOutputWriter) throws IOException {
        if (this.partitionWriters != null) {
            long nanoTime = System.nanoTime();
            boolean encryptionEnabled = this.blockManager.serializerManager().encryptionEnabled();
            for (int i = 0; i < this.partitionChecksums.length; i++) {
                this.partitionChecksums[i] = this.partitionWriters[i].getChecksum();
            }
            for (int i2 = 0; i2 < this.numPartitions; i2++) {
                try {
                    File file = this.partitionWriterSegments[i2].file();
                    ShufflePartitionWriter partitionWriter = shuffleMapOutputWriter.getPartitionWriter(i2);
                    if (file.exists()) {
                        if (!this.transferToEnabled || encryptionEnabled) {
                            writePartitionedDataWithStream(file, partitionWriter);
                        } else {
                            Optional openChannelWrapper = partitionWriter.openChannelWrapper();
                            if (openChannelWrapper.isPresent()) {
                                writePartitionedDataWithChannel(file, (WritableByteChannelWrapper) openChannelWrapper.get());
                            } else {
                                writePartitionedDataWithStream(file, partitionWriter);
                            }
                        }
                        if (!file.delete()) {
                            logger.error("Unable to delete file for partition {}", Integer.valueOf(i2));
                        }
                    }
                } finally {
                    this.writeMetrics.incWriteTime(System.nanoTime() - nanoTime);
                }
            }
            this.partitionWriters = null;
        }
        return shuffleMapOutputWriter.commitAllPartitions(this.partitionChecksums).getPartitionLengths();
    }

    private void writePartitionedDataWithChannel(File file, WritableByteChannelWrapper writableByteChannelWrapper) throws IOException {
        try {
            FileInputStream fileInputStream = new FileInputStream(file);
            try {
                FileChannel channel = fileInputStream.getChannel();
                try {
                    Utils.copyFileStreamNIO(channel, writableByteChannelWrapper.channel(), 0L, channel.size());
                    if (channel != null) {
                        channel.close();
                    }
                    Closeables.close(fileInputStream, false);
                    Closeables.close(writableByteChannelWrapper, false);
                } catch (Throwable th) {
                    if (channel != null) {
                        try {
                            channel.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                Closeables.close(fileInputStream, true);
                throw th3;
            }
        } catch (Throwable th4) {
            Closeables.close(writableByteChannelWrapper, true);
            throw th4;
        }
    }

    private void writePartitionedDataWithStream(File file, ShufflePartitionWriter shufflePartitionWriter) throws IOException {
        boolean z = true;
        FileInputStream fileInputStream = new FileInputStream(file);
        try {
            OutputStream wrapForEncryption = this.blockManager.serializerManager().wrapForEncryption(shufflePartitionWriter.openStream());
            try {
                Utils.copyStream(fileInputStream, wrapForEncryption, false, false);
                z = false;
                Closeables.close(wrapForEncryption, false);
                Closeables.close(fileInputStream, false);
            } catch (Throwable th) {
                Closeables.close(wrapForEncryption, z);
                throw th;
            }
        } catch (Throwable th2) {
            Closeables.close(fileInputStream, z);
            throw th2;
        }
    }

    public Option<MapStatus> stop(boolean z) {
        if (this.stopping) {
            return None$.empty();
        }
        this.stopping = true;
        if (z) {
            if (this.mapStatus == null) {
                throw new IllegalStateException("Cannot call stop(true) without having called write()");
            }
            return Option.apply(this.mapStatus);
        }
        if (this.partitionWriters != null) {
            try {
                for (CometDiskBlockWriter cometDiskBlockWriter : this.partitionWriters) {
                    cometDiskBlockWriter.freeMemory();
                    File file = cometDiskBlockWriter.getFile();
                    if (!file.delete()) {
                        logger.error("Error while deleting file {}", file.getAbsolutePath());
                    }
                }
            } finally {
                this.partitionWriters = null;
            }
        }
        return None$.empty();
    }

    static {
        $assertionsDisabled = !CometBypassMergeSortShuffleWriter.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(CometBypassMergeSortShuffleWriter.class);
    }
}
