package org.apache.spark.sql.comet.execution.shuffle;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.comet.CometConf$;
import org.apache.comet.Native;
import org.apache.comet.shaded.guava.annotations.VisibleForTesting;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator;
import org.apache.spark.shuffle.sort.RowPartition;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.FileSegment;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

/* loaded from: input_file:org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.class */
public final class CometDiskBlockWriter {
    private static final Logger logger;
    private static final ClassTag<Object> OBJECT_CLASS_TAG;
    private static final LinkedList<CometDiskBlockWriter> currentWriters;
    private final TaskContext taskContext;

    @VisibleForTesting
    static final int DEFAULT_INITIAL_SER_BUFFER_SIZE = 1048576;
    static final int MAXIMUM_PAGE_SIZE_BYTES = 134217728;
    private final CometShuffleMemoryAllocator allocator;
    private final SerializerInstance serializer;
    private final StructType schema;
    private final ShuffleWriteMetricsReporter writeMetrics;
    private final File file;
    private final boolean isAsync;
    private final int asyncThreadNum;
    private final ExecutorService threadPool;
    private ExposedByteArrayOutputStream serBuffer;
    private SerializationStream serOutputStream;
    static final /* synthetic */ boolean $assertionsDisabled;
    private ConcurrentLinkedQueue<Future<Void>> asyncSpillTasks = new ConcurrentLinkedQueue<>();
    private final LinkedList<ArrowIPCWriter> spillingWriters = new LinkedList<>();
    private final int uaoSize = UnsafeAlignedOffset.getUaoSize();
    private long totalWritten = 0;
    private boolean initialized = false;
    private boolean spilling = false;
    private long outputRecords = 0;
    private long insertRecords = 0;
    private final Native nativeLib = new Native();
    private final int columnarBatchSize = ((Integer) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_BATCH_SIZE().get()).intValue();
    private final int numElementsForSpillThreshold = ((Integer) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get()).intValue();
    private final double preferDictionaryRatio = ((Double) CometConf$.MODULE$.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO().get()).doubleValue();
    private ArrowIPCWriter activeWriter = new ArrowIPCWriter();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter$ArrowIPCWriter.class */
    public class ArrowIPCWriter extends SpillWriter {
        private final RowPartition rowPartition;

        ArrowIPCWriter() {
            this.rowPartition = new RowPartition(CometDiskBlockWriter.this.columnarBatchSize);
            this.allocatedPages = new LinkedList<>();
            this.allocator = CometDiskBlockWriter.this.allocator;
            this.nativeLib = CometDiskBlockWriter.this.nativeLib;
            this.dataTypes = serializeSchema(CometDiskBlockWriter.this.schema);
        }

        void insertRecord(Object obj, long j, int i) {
            Object baseObject = this.currentPage.getBaseObject();
            this.rowPartition.addRow(this.allocator.getOffsetInPage(this.allocator.encodePageNumberAndOffset(this.currentPage, this.pageCursor)) + CometDiskBlockWriter.this.uaoSize + 4, i - 4);
            UnsafeAlignedOffset.putSize(baseObject, this.pageCursor, i);
            this.pageCursor += CometDiskBlockWriter.this.uaoSize;
            Platform.copyMemory(obj, j, baseObject, this.pageCursor, i);
            this.pageCursor += i;
        }

        int numRecords() {
            return this.rowPartition.getNumRows();
        }

        long doSpilling(boolean z) throws IOException {
            long doSpilling;
            ShuffleWriteMetricsReporter shuffleWriteMetrics = z ? CometDiskBlockWriter.this.writeMetrics : new ShuffleWriteMetrics();
            synchronized (CometDiskBlockWriter.this.file) {
                CometDiskBlockWriter.this.outputRecords += this.rowPartition.getNumRows();
                doSpilling = doSpilling(this.dataTypes, CometDiskBlockWriter.this.file, this.rowPartition, shuffleWriteMetrics, CometDiskBlockWriter.this.preferDictionaryRatio);
            }
            synchronized (CometDiskBlockWriter.this.writeMetrics) {
                if (!z) {
                    CometDiskBlockWriter.this.writeMetrics.incRecordsWritten(((ShuffleWriteMetrics) shuffleWriteMetrics).recordsWritten());
                    CometDiskBlockWriter.this.taskContext.taskMetrics().incDiskBytesSpilled(((ShuffleWriteMetrics) shuffleWriteMetrics).bytesWritten());
                }
            }
            return doSpilling;
        }

        @Override // org.apache.spark.sql.comet.execution.shuffle.SpillWriter
        protected void spill(int i) throws IOException {
            synchronized (CometDiskBlockWriter.currentWriters) {
                Collections.sort(CometDiskBlockWriter.currentWriters, new Comparator<CometDiskBlockWriter>() { // from class: org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter.ArrowIPCWriter.1
                    @Override // java.util.Comparator
                    public int compare(CometDiskBlockWriter cometDiskBlockWriter, CometDiskBlockWriter cometDiskBlockWriter2) {
                        return Long.compare(cometDiskBlockWriter2.getActiveMemoryUsage(), cometDiskBlockWriter.getActiveMemoryUsage());
                    }
                });
                Iterator<CometDiskBlockWriter> it = CometDiskBlockWriter.currentWriters.iterator();
                while (it.hasNext()) {
                    it.next().doSpill(true);
                    if (this.allocator.getAvailableMemory() >= i) {
                        break;
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CometDiskBlockWriter(File file, TaskMemoryManager taskMemoryManager, TaskContext taskContext, SerializerInstance serializerInstance, StructType structType, ShuffleWriteMetricsReporter shuffleWriteMetricsReporter, SparkConf sparkConf, boolean z, int i, ExecutorService executorService) {
        this.allocator = CometShuffleMemoryAllocator.getInstance(sparkConf, taskMemoryManager, Math.min(134217728L, taskMemoryManager.pageSizeBytes()));
        this.taskContext = taskContext;
        this.serializer = serializerInstance;
        this.schema = structType;
        this.writeMetrics = shuffleWriteMetricsReporter;
        this.file = file;
        this.isAsync = z;
        this.asyncThreadNum = i;
        this.threadPool = executorService;
        synchronized (currentWriters) {
            currentWriters.add(this);
        }
    }

    public void setChecksumAlgo(String str) {
        this.activeWriter.setChecksumAlgo(str);
    }

    public void setChecksum(long j) {
        this.activeWriter.setChecksum(j);
    }

    public long getChecksum() {
        return this.activeWriter.getChecksum();
    }

    private void doSpill(boolean z) throws IOException {
        if (this.spilling || this.activeWriter.numRecords() == 0) {
            return;
        }
        this.spilling = true;
        if (!this.isAsync || z) {
            synchronized (this) {
                this.totalWritten += this.activeWriter.doSpilling(false);
                this.activeWriter.freeMemory();
            }
        } else {
            while (this.asyncSpillTasks.size() == this.asyncThreadNum) {
                Iterator<Future<Void>> it = this.asyncSpillTasks.iterator();
                while (true) {
                    if (it.hasNext()) {
                        Future<Void> next = it.next();
                        if (next.isDone()) {
                            this.asyncSpillTasks.remove(next);
                            break;
                        }
                    }
                }
            }
            final ArrowIPCWriter arrowIPCWriter = this.activeWriter;
            this.activeWriter = new ArrowIPCWriter();
            this.spillingWriters.add(arrowIPCWriter);
            this.asyncSpillTasks.add(this.threadPool.submit(new Runnable() { // from class: org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            CometDiskBlockWriter.this.totalWritten += arrowIPCWriter.doSpilling(false);
                            arrowIPCWriter.freeMemory();
                            CometDiskBlockWriter.this.spillingWriters.remove(arrowIPCWriter);
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    } catch (Throwable th) {
                        arrowIPCWriter.freeMemory();
                        CometDiskBlockWriter.this.spillingWriters.remove(arrowIPCWriter);
                        throw th;
                    }
                }
            }, null));
        }
        this.spilling = false;
    }

    public long getOutputRecords() {
        return this.outputRecords;
    }

    public void insertRow(UnsafeRow unsafeRow, int i) throws IOException {
        this.insertRecords++;
        if (!this.initialized) {
            this.serBuffer = new ExposedByteArrayOutputStream(1048576);
            this.serOutputStream = this.serializer.serializeStream(this.serBuffer);
            this.initialized = true;
        }
        this.serBuffer.reset();
        this.serOutputStream.writeKey(Integer.valueOf(i), OBJECT_CLASS_TAG);
        this.serOutputStream.writeValue(unsafeRow, OBJECT_CLASS_TAG);
        this.serOutputStream.flush();
        int size = this.serBuffer.size();
        if (!$assertionsDisabled && size <= 0) {
            throw new AssertionError();
        }
        synchronized (this) {
            if (this.activeWriter.numRecords() >= this.numElementsForSpillThreshold || this.activeWriter.numRecords() >= this.columnarBatchSize) {
                logger.info("Spilling data because number of spilledRecords crossed the threshold " + Math.min(this.numElementsForSpillThreshold, this.columnarBatchSize));
                doSpill(false);
                if (this.activeWriter.numRecords() != 0) {
                    throw new RuntimeException("activeWriter.numRecords()(" + this.activeWriter.numRecords() + ") != 0");
                }
            }
            int i2 = size + this.uaoSize;
            if (!this.activeWriter.acquireNewPageIfNecessary(i2)) {
                this.activeWriter.initialCurrentPage(i2);
            }
            this.activeWriter.insertRecord(this.serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, size);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FileSegment close() throws IOException {
        if (this.isAsync) {
            Iterator<Future<Void>> it = this.asyncSpillTasks.iterator();
            while (it.hasNext()) {
                try {
                    it.next().get();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }
        this.totalWritten += this.activeWriter.doSpilling(true);
        if (this.outputRecords != this.insertRecords) {
            long j = this.outputRecords;
            long j2 = this.insertRecords;
            RuntimeException runtimeException = new RuntimeException("outputRecords(" + j + ") != insertRecords(" + runtimeException + "). Please file a bug report.");
            throw runtimeException;
        }
        this.serBuffer = null;
        this.serOutputStream = null;
        this.activeWriter.freeMemory();
        synchronized (currentWriters) {
            currentWriters.remove(this);
        }
        return new FileSegment(this.file, 0L, this.totalWritten);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public File getFile() {
        return this.file;
    }

    long getActiveMemoryUsage() {
        return this.activeWriter.getMemoryUsage();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void freeMemory() {
        Iterator<ArrowIPCWriter> it = this.spillingWriters.iterator();
        while (it.hasNext()) {
            it.next().freeMemory();
        }
        this.activeWriter.freeMemory();
    }

    static {
        $assertionsDisabled = !CometDiskBlockWriter.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(CometDiskBlockWriter.class);
        OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object();
        currentWriters = new LinkedList<>();
    }
}
