package org.apache.comet.parquet;

import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.comet.CometConf;
import org.apache.comet.shaded.arrow.c.CometSchemaImporter;
import org.apache.comet.shaded.arrow.memory.BufferAllocator;
import org.apache.comet.shaded.arrow.memory.RootAllocator;
import org.apache.comet.shims.ShimBatchReader;
import org.apache.comet.shims.ShimFileFormat;
import org.apache.comet.vector.CometVector;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.comet.parquet.CometParquetReadSupport;
import org.apache.spark.sql.comet.shims.ShimTaskMetrics;
import org.apache.spark.sql.execution.datasources.PartitionedFile;
import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.util.AccumulatorV2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* loaded from: input_file:org/apache/comet/parquet/BatchReader.class */
public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
    protected static final BufferAllocator ALLOCATOR = new RootAllocator();
    private Configuration conf;
    private int capacity;
    private boolean isCaseSensitive;
    private boolean useFieldId;
    private boolean ignoreMissingIds;
    private StructType partitionSchema;
    private InternalRow partitionValues;
    private PartitionedFile file;
    private final Map<String, SQLMetric> metrics;
    private long rowsRead;
    private StructType sparkSchema;
    private MessageType requestedSchema;
    private CometVector[] vectors;
    private AbstractColumnReader[] columnReaders;
    private CometSchemaImporter importer;
    private ColumnarBatch currentBatch;
    private Future<Option<Throwable>> prefetchTask;
    private LinkedBlockingQueue<Pair<PageReadStore, Long>> prefetchQueue;
    private FileReader fileReader;
    private boolean[] missingColumns;
    private boolean isInitialized;
    private ParquetMetadata footer;
    private long totalRowCount;
    private long totalRowsLoaded;
    private boolean useDecimal128;
    private boolean useLazyMaterialization;
    private boolean useLegacyDateTimestamp;
    private final TaskContext taskContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/comet/parquet/BatchReader$PrefetchTask.class */
    public class PrefetchTask implements Callable<Option<Throwable>> {
        private PrefetchTask() {
        }

        private long getBytesRead() {
            return FileSystem.getAllStatistics().stream().mapToLong(statistics -> {
                return statistics.getThreadStatistics().getBytesRead();
            }).sum();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Option<Throwable> call() throws Exception {
            long bytesRead = getBytesRead();
            try {
                try {
                    BatchReader.this.init();
                    while (true) {
                        PageReadStore readNextRowGroup = BatchReader.this.fileReader.readNextRowGroup();
                        if (readNextRowGroup == null) {
                            break;
                        }
                        BatchReader.this.prefetchQueue.add(Pair.of(readNextRowGroup, Long.valueOf(getBytesRead() - bytesRead)));
                    }
                    Option<Throwable> empty = Option.empty();
                    if (BatchReader.this.fileReader != null) {
                        BatchReader.this.fileReader.closeStream();
                    }
                    return empty;
                } catch (Throwable th) {
                    Option<Throwable> apply = Option.apply(th);
                    if (BatchReader.this.fileReader != null) {
                        BatchReader.this.fileReader.closeStream();
                    }
                    return apply;
                }
            } catch (Throwable th2) {
                if (BatchReader.this.fileReader != null) {
                    BatchReader.this.fileReader.closeStream();
                }
                throw th2;
            }
        }
    }

    public BatchReader(String str, int i) {
        this(str, i, null, null);
    }

    public BatchReader(String str, int i, StructType structType, InternalRow internalRow) {
        this(new Configuration(), str, i, structType, internalRow);
    }

    public BatchReader(Configuration configuration, String str, int i, StructType structType, InternalRow internalRow) {
        configuration.set("spark.sql.parquet.binaryAsString", "false");
        configuration.set("spark.sql.parquet.int96AsTimestamp", "false");
        configuration.set("spark.sql.caseSensitive", "false");
        configuration.set("spark.sql.parquet.inferTimestampNTZ.enabled", "true");
        configuration.set("spark.sql.legacy.parquet.nanosAsLong", "false");
        this.conf = configuration;
        this.capacity = i;
        this.isCaseSensitive = false;
        this.useFieldId = false;
        this.ignoreMissingIds = false;
        this.partitionSchema = structType;
        this.partitionValues = internalRow;
        this.file = ShimBatchReader.newPartitionedFile(internalRow, str);
        this.metrics = new HashMap();
        this.taskContext = TaskContext$.MODULE$.get();
    }

    public BatchReader(AbstractColumnReader[] abstractColumnReaderArr) {
        int length = abstractColumnReaderArr.length;
        this.columnReaders = new AbstractColumnReader[length];
        this.vectors = new CometVector[length];
        this.currentBatch = new ColumnarBatch(this.vectors);
        this.isInitialized = true;
        this.taskContext = TaskContext$.MODULE$.get();
        this.metrics = new HashMap();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchReader(Configuration configuration, PartitionedFile partitionedFile, ParquetMetadata parquetMetadata, int i, StructType structType, boolean z, boolean z2, boolean z3, boolean z4, StructType structType2, InternalRow internalRow, Map<String, SQLMetric> map) {
        this.conf = configuration;
        this.capacity = i;
        this.sparkSchema = structType;
        this.isCaseSensitive = z;
        this.useFieldId = z2;
        this.ignoreMissingIds = z3;
        this.useLegacyDateTimestamp = z4;
        this.partitionSchema = structType2;
        this.partitionValues = internalRow;
        this.file = partitionedFile;
        this.footer = parquetMetadata;
        this.metrics = map;
        this.taskContext = TaskContext$.MODULE$.get();
    }

    public void init() throws URISyntaxException, IOException {
        this.useDecimal128 = this.conf.getBoolean(CometConf.COMET_USE_DECIMAL_128().key(), ((Boolean) CometConf.COMET_USE_DECIMAL_128().defaultValue().get()).booleanValue());
        this.useLazyMaterialization = this.conf.getBoolean(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), ((Boolean) CometConf.COMET_USE_LAZY_MATERIALIZATION().defaultValue().get()).booleanValue());
        long start = this.file.start();
        long length = this.file.length();
        String sparkPath = this.file.filePath().toString();
        ParquetReadOptions.Builder builder = HadoopReadOptions.builder(this.conf, new Path(sparkPath));
        if (start >= 0 && length >= 0) {
            builder = builder.withRange(start, start + length);
        }
        this.fileReader = new FileReader(CometInputFile.fromPath(new Path(new URI(sparkPath)), this.conf), this.footer, builder.build(), ReadOptions.builder(this.conf).build(), this.metrics);
        this.requestedSchema = this.fileReader.getFileMetaData().getSchema();
        MessageType messageType = this.requestedSchema;
        if (this.sparkSchema == null) {
            this.sparkSchema = new ParquetToSparkSchemaConverter(this.conf).convert(this.requestedSchema);
        } else {
            this.requestedSchema = CometParquetReadSupport.clipParquetSchema(this.requestedSchema, this.sparkSchema, this.isCaseSensitive, this.useFieldId, this.ignoreMissingIds);
            if (this.requestedSchema.getFieldCount() != this.sparkSchema.size()) {
                throw new IllegalArgumentException(String.format("Spark schema has %d columns while Parquet schema has %d columns", Integer.valueOf(this.sparkSchema.size()), Integer.valueOf(this.requestedSchema.getColumns().size())));
            }
        }
        this.totalRowCount = this.fileReader.getRecordCount();
        List columns = this.requestedSchema.getColumns();
        int size = columns.size();
        if (this.partitionSchema != null) {
            size += this.partitionSchema.size();
        }
        this.columnReaders = new AbstractColumnReader[size];
        this.missingColumns = new boolean[columns.size()];
        List paths = this.requestedSchema.getPaths();
        StructField[] fields = this.sparkSchema.fields();
        ShimFileFormat.findRowIndexColumnIndexInSchema(this.sparkSchema);
        for (int i = 0; i < this.requestedSchema.getFieldCount(); i++) {
            Type type = (Type) this.requestedSchema.getFields().get(i);
            Preconditions.checkState(type.isPrimitive() && !type.isRepetition(Type.Repetition.REPEATED), "Complex type is not supported");
            String[] strArr = (String[]) paths.get(i);
            if (fields[i].name().equals(ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME())) {
                this.columnReaders[i] = new RowIndexColumnReader(fields[i], this.capacity, this.fileReader.getRowIndices());
                this.missingColumns[i] = true;
            } else if (messageType.containsPath(strArr)) {
                if (!messageType.getColumnDescription(strArr).equals(columns.get(i))) {
                    throw new UnsupportedOperationException("Schema evolution is not supported");
                }
                this.missingColumns[i] = false;
            } else {
                if (((ColumnDescriptor) columns.get(i)).getMaxDefinitionLevel() == 0) {
                    throw new IOException("Required column '" + Arrays.toString(strArr) + "' is missing in data file " + sparkPath);
                }
                this.columnReaders[i] = new ConstantColumnReader(fields[i], this.capacity, this.useDecimal128);
                this.missingColumns[i] = true;
            }
        }
        if (this.partitionSchema != null) {
            StructField[] fields2 = this.partitionSchema.fields();
            for (int size2 = columns.size(); size2 < this.columnReaders.length; size2++) {
                int size3 = size2 - columns.size();
                this.columnReaders[size2] = new ConstantColumnReader(fields2[size3], this.capacity, this.partitionValues, size3, this.useDecimal128);
            }
        }
        this.vectors = new CometVector[size];
        this.currentBatch = new ColumnarBatch(this.vectors);
        this.fileReader.setRequestedSchema(this.requestedSchema.getColumns());
        if (this.taskContext != null) {
            Option<AccumulatorV2<?, ?>> taskAccumulator = ShimTaskMetrics.getTaskAccumulator(this.taskContext.taskMetrics());
            if (taskAccumulator.isDefined() && ((AccumulatorV2) taskAccumulator.get()).getClass().getSimpleName().equals("NumRowGroupsAcc")) {
                ((AccumulatorV2) taskAccumulator.get()).add(Integer.valueOf(this.fileReader.getRowGroups().size()));
            }
        }
        if (this.conf.getBoolean(CometConf.COMET_SCAN_PREFETCH_ENABLED().key(), ((Boolean) CometConf.COMET_SCAN_PREFETCH_ENABLED().defaultValue().get()).booleanValue())) {
            LOG.info("Prefetch enabled for BatchReader.");
            this.prefetchQueue = new LinkedBlockingQueue<>();
        }
        this.isInitialized = true;
        synchronized (this) {
            notifyAll();
        }
    }

    public void setSparkSchema(StructType structType) {
        this.sparkSchema = structType;
    }

    public AbstractColumnReader[] getColumnReaders() {
        return this.columnReaders;
    }

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
    }

    public boolean nextKeyValue() throws IOException {
        return nextBatch();
    }

    /* renamed from: getCurrentKey, reason: merged with bridge method [inline-methods] */
    public Void m30getCurrentKey() {
        return null;
    }

    /* renamed from: getCurrentValue, reason: merged with bridge method [inline-methods] */
    public ColumnarBatch m29getCurrentValue() {
        return currentBatch();
    }

    public float getProgress() {
        return ((float) this.rowsRead) / ((float) this.totalRowCount);
    }

    public ColumnarBatch currentBatch() {
        return this.currentBatch;
    }

    public Future<Option<Throwable>> getPrefetchTask() {
        return this.prefetchTask;
    }

    public LinkedBlockingQueue<Pair<PageReadStore, Long>> getPrefetchQueue() {
        return this.prefetchQueue;
    }

    public boolean nextBatch() throws IOException {
        if (this.prefetchTask == null) {
            Preconditions.checkState(this.isInitialized, "init() should be called first!");
        } else {
            while (!this.isInitialized) {
                synchronized (this) {
                    try {
                        try {
                            wait(100L);
                            if (this.prefetchTask.isDone()) {
                                Option<Throwable> option = this.prefetchTask.get();
                                if (option.isDefined()) {
                                    throw ((Throwable) option.get());
                                }
                            }
                        } catch (RuntimeException e) {
                            throw e;
                        }
                    } finally {
                        IOException iOException = new IOException(th);
                    }
                }
            }
        }
        if (this.rowsRead >= this.totalRowCount) {
            return false;
        }
        try {
            if (loadNextRowGroupIfNecessary()) {
                return nextBatch((int) Math.min(this.capacity, this.totalRowsLoaded - this.rowsRead));
            }
            return false;
        } catch (RuntimeException e2) {
            throw e2;
        } catch (Throwable th) {
            throw new IOException(th);
        }
    }

    public boolean nextBatch(int i) {
        long j = 0;
        long j2 = 0;
        for (int i2 = 0; i2 < this.columnReaders.length; i2++) {
            AbstractColumnReader abstractColumnReader = this.columnReaders[i2];
            long nanoTime = System.nanoTime();
            abstractColumnReader.readBatch(i);
            j += System.nanoTime() - nanoTime;
            long nanoTime2 = System.nanoTime();
            this.vectors[i2] = abstractColumnReader.currentBatch();
            j2 += System.nanoTime() - nanoTime2;
        }
        SQLMetric sQLMetric = this.metrics.get("ParquetNativeDecodeTime");
        if (sQLMetric != null) {
            sQLMetric.add(j);
        }
        SQLMetric sQLMetric2 = this.metrics.get("ParquetNativeLoadTime");
        if (sQLMetric2 != null) {
            sQLMetric2.add(j2);
        }
        this.currentBatch.setNumRows(i);
        this.rowsRead += i;
        return true;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.columnReaders != null) {
            for (AbstractColumnReader abstractColumnReader : this.columnReaders) {
                if (abstractColumnReader != null) {
                    abstractColumnReader.close();
                }
            }
        }
        if (this.fileReader != null) {
            this.fileReader.close();
            this.fileReader = null;
        }
        if (this.importer != null) {
            this.importer.close();
            this.importer = null;
        }
    }

    private boolean loadNextRowGroupIfNecessary() throws Throwable {
        PageReadStore readNextRowGroup;
        if (this.rowsRead != this.totalRowsLoaded) {
            return true;
        }
        SQLMetric sQLMetric = this.metrics.get("ParquetLoadRowGroupTime");
        SQLMetric sQLMetric2 = this.metrics.get("ParquetRowGroups");
        long nanoTime = System.nanoTime();
        if (this.prefetchTask == null || this.prefetchQueue == null) {
            readNextRowGroup = this.fileReader.readNextRowGroup();
        } else {
            Pair<PageReadStore, Long> take = this.prefetchQueue.take();
            readNextRowGroup = (PageReadStore) take.getLeft();
            long longValue = ((Long) take.getRight()).longValue();
            FileSystem.getAllStatistics().stream().forEach(statistics -> {
                statistics.incrementBytesRead(longValue);
            });
        }
        if (sQLMetric != null) {
            sQLMetric.add(System.nanoTime() - nanoTime);
        }
        if (readNextRowGroup == null) {
            return false;
        }
        if (sQLMetric2 != null) {
            sQLMetric2.add(1L);
        }
        if (this.importer != null) {
            this.importer.close();
        }
        this.importer = new CometSchemaImporter(ALLOCATOR);
        List columns = this.requestedSchema.getColumns();
        for (int i = 0; i < columns.size(); i++) {
            if (!this.missingColumns[i]) {
                if (this.columnReaders[i] != null) {
                    this.columnReaders[i].close();
                }
                ColumnReader columnReader = Utils.getColumnReader(this.sparkSchema.fields()[i].dataType(), (ColumnDescriptor) columns.get(i), this.importer, this.capacity, this.useDecimal128, this.useLazyMaterialization, this.useLegacyDateTimestamp);
                columnReader.setPageReader(readNextRowGroup.getPageReader((ColumnDescriptor) columns.get(i)));
                this.columnReaders[i] = columnReader;
            }
        }
        this.totalRowsLoaded += readNextRowGroup.getRowCount();
        return true;
    }

    public void submitPrefetchTask(ExecutorService executorService) {
        this.prefetchTask = executorService.submit(new PrefetchTask());
    }
}
