package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.comet.CometConf$;
import org.apache.comet.Native;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.comet.CometShuffleChecksumSupport;
import org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator;
import org.apache.spark.shuffle.comet.CometShuffleMemoryAllocatorTrait;
import org.apache.spark.shuffle.comet.TooLargePageException;
import org.apache.spark.shuffle.sort.ShuffleInMemorySorter;
import org.apache.spark.sql.comet.execution.shuffle.ShuffleThreadPool;
import org.apache.spark.sql.comet.execution.shuffle.SpillInfo;
import org.apache.spark.sql.comet.execution.shuffle.SpillWriter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.TempShuffleBlockId;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/spark/shuffle/sort/CometShuffleExternalSorter.class */
public final class CometShuffleExternalSorter implements CometShuffleChecksumSupport {
    private static final Logger logger;
    private final int numPartitions;
    private final BlockManager blockManager;
    private final TaskContext taskContext;
    private final ShuffleWriteMetricsReporter writeMetrics;
    private final StructType schema;
    private int initialSize;
    private SpillSorter activeSpillSorter;
    private final long[] partitionChecksums;
    private final String checksumAlgorithm;
    private final CometShuffleMemoryAllocatorTrait allocator;
    private final ExecutorService threadPool;
    private final int threadNum;
    private final double preferDictionaryRatio;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ConcurrentLinkedQueue<SpillSorter> spillingSorters = new ConcurrentLinkedQueue<>();
    private final LinkedList<SpillInfo> spills = new LinkedList<>();
    private ConcurrentLinkedQueue<Future<Void>> asyncSpillTasks = new ConcurrentLinkedQueue<>();
    private boolean spilling = false;
    private final int uaoSize = UnsafeAlignedOffset.getUaoSize();
    private final int numElementsForSpillThreshold = ((Integer) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get()).intValue();
    private long peakMemoryUsedBytes = getMemoryUsage();
    private final String compressionCodec = CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC().get();
    private final int compressionLevel = ((Integer) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL().get()).intValue();
    private final boolean isAsync = ((Boolean) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get()).booleanValue();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/spark/shuffle/sort/CometShuffleExternalSorter$SpillSorter.class */
    public class SpillSorter extends SpillWriter {
        private boolean freed = false;
        private SpillInfo spillInfo = null;

        @Nullable
        private ShuffleInMemorySorter inMemSorter;
        private LongArray sorterArray;
        static final /* synthetic */ boolean $assertionsDisabled;

        SpillSorter() {
            this.allocator = CometShuffleExternalSorter.this.allocator;
            try {
                this.inMemSorter = new ShuffleInMemorySorter(this.allocator, 1, true);
                this.sorterArray = this.allocator.allocateArray(CometShuffleExternalSorter.this.initialSize);
                this.inMemSorter.expandPointerArray(this.sorterArray);
                this.allocatedPages = new LinkedList<>();
                this.nativeLib = new Native();
                this.dataTypes = serializeSchema(CometShuffleExternalSorter.this.schema);
            } catch (IllegalAccessError e) {
                throw new RuntimeException("Error loading in-memory sorter check class path -- see https://github.com/apache/arrow-datafusion-comet?tab=readme-ov-file#enable-comet-shuffle", e);
            }
        }

        @Override // org.apache.spark.sql.comet.execution.shuffle.SpillWriter
        public long freeMemory() {
            long freeMemory;
            synchronized (this) {
                freeMemory = super.freeMemory();
            }
            return freeMemory;
        }

        @Override // org.apache.spark.sql.comet.execution.shuffle.SpillWriter
        public long getMemoryUsage() {
            synchronized (this) {
                long memoryUsage = super.getMemoryUsage();
                if (this.freed) {
                    return memoryUsage;
                }
                return (this.inMemSorter == null ? 0L : this.inMemSorter.getMemoryUsage()) + memoryUsage;
            }
        }

        @Override // org.apache.spark.sql.comet.execution.shuffle.SpillWriter
        protected void spill(int i) throws IOException {
            CometShuffleExternalSorter.this.spill();
        }

        public void freeArray() {
            synchronized (this) {
                this.inMemSorter.free();
                this.freed = true;
            }
        }

        public void reset() {
            this.inMemSorter.reset();
            this.sorterArray = this.allocator.allocateArray(CometShuffleExternalSorter.this.initialSize);
            this.inMemSorter.expandPointerArray(this.sorterArray);
        }

        void setSpillInfo(SpillInfo spillInfo) {
            this.spillInfo = spillInfo;
        }

        public int numRecords() {
            return this.inMemSorter.numRecords();
        }

        public void writeSortedFileNative(boolean z) throws IOException {
            long baseOffset = this.sorterArray.getBaseOffset();
            int numRecords = this.inMemSorter.numRecords();
            this.nativeLib.sortRowPartitionsNative(baseOffset, numRecords);
            ShuffleInMemorySorter.ShuffleSorterIterator shuffleSorterIterator = new ShuffleInMemorySorter.ShuffleSorterIterator(numRecords, this.sorterArray, 0);
            if (shuffleSorterIterator.hasNext()) {
                ShuffleWriteMetricsReporter shuffleWriteMetrics = z ? CometShuffleExternalSorter.this.writeMetrics : new ShuffleWriteMetrics();
                int i = -1;
                RowPartition rowPartition = new RowPartition(CometShuffleExternalSorter.this.initialSize);
                while (shuffleSorterIterator.hasNext()) {
                    shuffleSorterIterator.loadNext();
                    int partitionId = shuffleSorterIterator.packedRecordPointer.getPartitionId();
                    if (!$assertionsDisabled && partitionId < i) {
                        throw new AssertionError();
                    }
                    if (partitionId != i) {
                        if (i != -1) {
                            if (CometShuffleExternalSorter.this.partitionChecksums.length > 0) {
                                setChecksum(CometShuffleExternalSorter.this.partitionChecksums[i]);
                                setChecksumAlgo(CometShuffleExternalSorter.this.checksumAlgorithm);
                            }
                            this.spillInfo.partitionLengths[i] = doSpilling(this.dataTypes, this.spillInfo.file, rowPartition, shuffleWriteMetrics, CometShuffleExternalSorter.this.preferDictionaryRatio, CometShuffleExternalSorter.this.compressionCodec, CometShuffleExternalSorter.this.compressionLevel);
                            CometShuffleExternalSorter.this.partitionChecksums[i] = getChecksum();
                        }
                        i = partitionId;
                    }
                    long offsetInPage = this.allocator.getOffsetInPage(shuffleSorterIterator.packedRecordPointer.getRecordPointer());
                    rowPartition.addRow(offsetInPage + CometShuffleExternalSorter.this.uaoSize + 4, UnsafeAlignedOffset.getSize((Object) null, offsetInPage) - 4);
                }
                if (i != -1) {
                    this.spillInfo.partitionLengths[i] = doSpilling(this.dataTypes, this.spillInfo.file, rowPartition, shuffleWriteMetrics, CometShuffleExternalSorter.this.preferDictionaryRatio, CometShuffleExternalSorter.this.compressionCodec, CometShuffleExternalSorter.this.compressionLevel);
                    synchronized (CometShuffleExternalSorter.this.spills) {
                        CometShuffleExternalSorter.this.spills.add(this.spillInfo);
                    }
                }
                if (z) {
                    return;
                }
                synchronized (CometShuffleExternalSorter.this.writeMetrics) {
                    CometShuffleExternalSorter.this.writeMetrics.incRecordsWritten(((ShuffleWriteMetrics) shuffleWriteMetrics).recordsWritten());
                    CometShuffleExternalSorter.this.taskContext.taskMetrics().incDiskBytesSpilled(((ShuffleWriteMetrics) shuffleWriteMetrics).bytesWritten());
                }
            }
        }

        public boolean hasSpaceForAnotherRecord() {
            return this.inMemSorter.hasSpaceForAnotherRecord();
        }

        public void expandPointerArray(LongArray longArray) {
            this.inMemSorter.expandPointerArray(longArray);
            this.sorterArray = longArray;
        }

        public void insertRecord(Object obj, long j, int i, int i2) {
            Object baseObject = this.currentPage.getBaseObject();
            long encodePageNumberAndOffset = this.allocator.encodePageNumberAndOffset(this.currentPage, this.pageCursor);
            UnsafeAlignedOffset.putSize(baseObject, this.pageCursor, i);
            this.pageCursor += CometShuffleExternalSorter.this.uaoSize;
            Platform.copyMemory(obj, j, baseObject, this.pageCursor, i);
            this.pageCursor += i;
            this.inMemSorter.insertRecord(encodePageNumberAndOffset, i2);
        }

        static {
            $assertionsDisabled = !CometShuffleExternalSorter.class.desiredAssertionStatus();
        }
    }

    public CometShuffleExternalSorter(TaskMemoryManager taskMemoryManager, BlockManager blockManager, TaskContext taskContext, int i, int i2, SparkConf sparkConf, ShuffleWriteMetricsReporter shuffleWriteMetricsReporter, StructType structType) {
        this.allocator = CometShuffleMemoryAllocator.getInstance(sparkConf, taskMemoryManager, Math.min(134217728L, taskMemoryManager.pageSizeBytes()));
        this.blockManager = blockManager;
        this.taskContext = taskContext;
        this.numPartitions = i2;
        this.schema = structType;
        this.writeMetrics = shuffleWriteMetricsReporter;
        this.partitionChecksums = createPartitionChecksums(i2, sparkConf);
        this.checksumAlgorithm = getChecksumAlgorithm(sparkConf);
        this.initialSize = i;
        if (this.isAsync) {
            this.threadNum = ((Integer) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get()).intValue();
            if (!$assertionsDisabled && this.threadNum <= 0) {
                throw new AssertionError();
            }
            this.threadPool = ShuffleThreadPool.getThreadPool();
        } else {
            this.threadNum = 0;
            this.threadPool = null;
        }
        this.activeSpillSorter = new SpillSorter();
        this.preferDictionaryRatio = ((Double) CometConf$.MODULE$.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO().get()).doubleValue();
    }

    public long[] getChecksums() {
        return this.partitionChecksums;
    }

    public void spill() throws IOException {
        if (this.spilling || this.activeSpillSorter == null || this.activeSpillSorter.numRecords() == 0) {
            return;
        }
        this.spilling = true;
        Logger logger2 = logger;
        Object[] objArr = new Object[4];
        objArr[0] = Long.valueOf(Thread.currentThread().getId());
        objArr[1] = Utils.bytesToString(getMemoryUsage());
        objArr[2] = Integer.valueOf(this.spills.size());
        objArr[3] = this.spills.size() > 1 ? " times" : " time";
        logger2.info("Thread {} spilling sort data of {} to disk ({} {} so far)", objArr);
        Tuple2 createTempShuffleBlock = this.blockManager.diskBlockManager().createTempShuffleBlock();
        this.activeSpillSorter.setSpillInfo(new SpillInfo(this.numPartitions, (File) createTempShuffleBlock._2(), (TempShuffleBlockId) createTempShuffleBlock._1()));
        if (this.isAsync) {
            SpillSorter spillSorter = this.activeSpillSorter;
            Callable callable = () -> {
                spillSorter.writeSortedFileNative(false);
                long freeMemory = spillSorter.freeMemory();
                spillSorter.freeArray();
                this.spillingSorters.remove(spillSorter);
                synchronized (this) {
                    this.taskContext.taskMetrics().incMemoryBytesSpilled(freeMemory);
                }
                return null;
            };
            this.spillingSorters.add(spillSorter);
            this.asyncSpillTasks.add(this.threadPool.submit(callable));
            while (this.asyncSpillTasks.size() == this.threadNum) {
                Iterator<Future<Void>> it = this.asyncSpillTasks.iterator();
                while (true) {
                    if (it.hasNext()) {
                        Future<Void> next = it.next();
                        if (next.isDone()) {
                            this.asyncSpillTasks.remove(next);
                            break;
                        }
                    }
                }
            }
            this.activeSpillSorter = new SpillSorter();
        } else {
            this.activeSpillSorter.writeSortedFileNative(false);
            long freeMemory = this.activeSpillSorter.freeMemory();
            this.activeSpillSorter.reset();
            synchronized (this) {
                this.taskContext.taskMetrics().incMemoryBytesSpilled(freeMemory);
            }
        }
        this.spilling = false;
    }

    private long getMemoryUsage() {
        long j = 0;
        Iterator<SpillSorter> it = this.spillingSorters.iterator();
        while (it.hasNext()) {
            j += it.next().getMemoryUsage();
        }
        if (this.activeSpillSorter != null) {
            j += this.activeSpillSorter.getMemoryUsage();
        }
        return j;
    }

    private void updatePeakMemoryUsed() {
        long memoryUsage = getMemoryUsage();
        if (memoryUsage > this.peakMemoryUsedBytes) {
            this.peakMemoryUsedBytes = memoryUsage;
        }
    }

    public long getPeakMemoryUsedBytes() {
        updatePeakMemoryUsed();
        return this.peakMemoryUsedBytes;
    }

    private long freeMemory() {
        updatePeakMemoryUsed();
        long j = 0;
        if (this.isAsync) {
            Iterator<SpillSorter> it = this.spillingSorters.iterator();
            while (it.hasNext()) {
                SpillSorter next = it.next();
                j += next.freeMemory();
                next.freeArray();
            }
        }
        long freeMemory = j + this.activeSpillSorter.freeMemory();
        this.activeSpillSorter.freeArray();
        return freeMemory;
    }

    public void cleanupResources() {
        freeMemory();
        Iterator<SpillInfo> it = this.spills.iterator();
        while (it.hasNext()) {
            SpillInfo next = it.next();
            if (next.file.exists() && !next.file.delete()) {
                logger.error("Unable to delete spill file {}", next.file.getPath());
            }
        }
    }

    private void growPointerArrayIfNecessary() throws IOException {
        if (!$assertionsDisabled && this.activeSpillSorter == null) {
            throw new AssertionError();
        }
        if (this.activeSpillSorter.hasSpaceForAnotherRecord()) {
            return;
        }
        try {
            LongArray allocateArray = this.allocator.allocateArray((this.activeSpillSorter.getMemoryUsage() / 8) * 2);
            if (this.activeSpillSorter.hasSpaceForAnotherRecord()) {
                this.allocator.freeArray(allocateArray);
            } else {
                this.activeSpillSorter.expandPointerArray(allocateArray);
            }
        } catch (TooLargePageException e) {
            spill();
        } catch (SparkOutOfMemoryError e2) {
            try {
                spill();
            } catch (SparkOutOfMemoryError e3) {
                if (this.activeSpillSorter.hasSpaceForAnotherRecord()) {
                    return;
                }
                logger.error("Unable to grow the pointer array");
                throw e3;
            }
        }
    }

    public void insertRecord(Object obj, long j, int i, int i2) throws IOException {
        if (!$assertionsDisabled && this.activeSpillSorter == null) {
            throw new AssertionError();
        }
        int i3 = this.numElementsForSpillThreshold;
        if (this.activeSpillSorter.numRecords() >= i3) {
            logger.info("Spilling data because number of spilledRecords crossed the threshold " + i3);
            spill();
        }
        growPointerArrayIfNecessary();
        int i4 = i + this.uaoSize;
        if (!this.activeSpillSorter.acquireNewPageIfNecessary(i4)) {
            this.activeSpillSorter.initialCurrentPage(i4);
        }
        this.activeSpillSorter.insertRecord(obj, j, i, i2);
    }

    public SpillInfo[] closeAndGetSpills() throws IOException {
        if (this.activeSpillSorter != null) {
            Tuple2 createTempShuffleBlock = this.blockManager.diskBlockManager().createTempShuffleBlock();
            SpillInfo spillInfo = new SpillInfo(this.numPartitions, (File) createTempShuffleBlock._2(), (TempShuffleBlockId) createTempShuffleBlock._1());
            if (this.isAsync) {
                Iterator<Future<Void>> it = this.asyncSpillTasks.iterator();
                while (it.hasNext()) {
                    try {
                        it.next().get();
                    } catch (Exception e) {
                        throw new IOException(e);
                    }
                }
                this.asyncSpillTasks.clear();
            }
            this.activeSpillSorter.setSpillInfo(spillInfo);
            this.activeSpillSorter.writeSortedFileNative(true);
            freeMemory();
        }
        return (SpillInfo[]) this.spills.toArray(new SpillInfo[this.spills.size()]);
    }

    static {
        $assertionsDisabled = !CometShuffleExternalSorter.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(CometShuffleExternalSorter.class);
    }
}
