package org.apache.spark.sql.comet.execution.shuffle;

import java.io.InputStream;
import org.apache.comet.shaded.arrow.memory.rounding.SegmentRoundingPolicy;
import org.apache.spark.InterruptibleIterator;
import org.apache.spark.MapOutputTracker;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv$;
import org.apache.spark.TaskContext;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.config.package$;
import org.apache.spark.io.CompressionCodec$;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.shuffle.BaseShuffleHandle;
import org.apache.spark.shuffle.ShuffleReadMetricsReporter;
import org.apache.spark.shuffle.ShuffleReader;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BlockManagerId;
import org.apache.spark.storage.ShuffleBlockFetcherIterator;
import org.apache.spark.storage.ShuffleBlockFetcherIterator$;
import org.apache.spark.util.CompletionIterator$;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Product2;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.SeqLike;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

/* compiled from: CometBlockStoreShuffleReader.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005eg\u0001\u0002\f\u0018\u0001\u0019B\u0001b\u0012\u0001\u0003\u0002\u0003\u0006I\u0001\u0013\u0005\t\u001f\u0002\u0011\t\u0011)A\u0005!\"Aq\u000f\u0001B\u0001B\u0003%\u0001\u0010\u0003\u0005}\u0001\t\u0005\t\u0015!\u0003~\u0011)\t\t\u0001\u0001B\u0001B\u0003%\u00111\u0001\u0005\u000b\u0003\u001f\u0001!\u0011!Q\u0001\n\u0005E\u0001BCA\f\u0001\t\u0005\t\u0015!\u0003\u0002\u001a!Q\u0011q\u0004\u0001\u0003\u0002\u0003\u0006I!!\t\t\u000f\u0005\u001d\u0002\u0001\"\u0001\u0002*!I\u0011q\t\u0001C\u0002\u0013%\u0011\u0011\n\u0005\t\u00033\u0002\u0001\u0015!\u0003\u0002L!9\u0011q\r\u0001\u0005\n\u0005%\u0004bBA@\u0001\u0011\u0005\u0013\u0011\u0011\u0005\b\u0003\u0017\u0003A\u0011BAG\u000f%\tyiFA\u0001\u0012\u0003\t\tJ\u0002\u0005\u0017/\u0005\u0005\t\u0012AAJ\u0011\u001d\t9\u0003\u0005C\u0001\u0003+C\u0011\"a&\u0011#\u0003%\t!!'\t\u0013\u0005U\u0006#%A\u0005\u0002\u0005]\u0006\"CAa!E\u0005I\u0011AAb\u0011%\ti\rEI\u0001\n\u0003\tyM\u0001\u000fD_6,GO\u00117pG.\u001cFo\u001c:f'\",hM\u001a7f%\u0016\fG-\u001a:\u000b\u0005aI\u0012aB:ik\u001a4G.\u001a\u0006\u00035m\t\u0011\"\u001a=fGV$\u0018n\u001c8\u000b\u0005qi\u0012!B2p[\u0016$(B\u0001\u0010 \u0003\r\u0019\u0018\u000f\u001c\u0006\u0003A\u0005\nQa\u001d9be.T!AI\u0012\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005!\u0013aA8sO\u000e\u0001QcA\u00146\u007fM!\u0001\u0001\u000b\u0018B!\tIC&D\u0001+\u0015\u0005Y\u0013!B:dC2\f\u0017BA\u0017+\u0005\u0019\te.\u001f*fMB!q&M\u001a?\u001b\u0005\u0001$B\u0001\r \u0013\t\u0011\u0004GA\u0007TQV4g\r\\3SK\u0006$WM\u001d\t\u0003iUb\u0001\u0001B\u00037\u0001\t\u0007qGA\u0001L#\tA4\b\u0005\u0002*s%\u0011!H\u000b\u0002\b\u001d>$\b.\u001b8h!\tIC(\u0003\u0002>U\t\u0019\u0011I\\=\u0011\u0005QzD!\u0002!\u0001\u0005\u00049$!A\"\u0011\u0005\t+U\"A\"\u000b\u0005\u0011{\u0012\u0001C5oi\u0016\u0014h.\u00197\n\u0005\u0019\u001b%a\u0002'pO\u001eLgnZ\u0001\u0007Q\u0006tG\r\\31\u0005%k\u0005#B\u0018Kg1s\u0014BA&1\u0005E\u0011\u0015m]3TQV4g\r\\3IC:$G.\u001a\t\u0003i5#\u0011BT\u0001\u0002\u0002\u0003\u0005)\u0011A\u001c\u0003\u0007}#\u0013'A\bcY>\u001c7n\u001d\"z\u0003\u0012$'/Z:t!\r\t\u0016\f\u0018\b\u0003%^s!a\u0015,\u000e\u0003QS!!V\u0013\u0002\rq\u0012xn\u001c;?\u0013\u0005Y\u0013B\u0001-+\u0003\u001d\u0001\u0018mY6bO\u0016L!AW.\u0003\u0011%#XM]1u_JT!\u0001\u0017\u0016\u0011\t%jv,Z\u0005\u0003=*\u0012a\u0001V;qY\u0016\u0014\u0004C\u00011d\u001b\u0005\t'B\u00012 \u0003\u001d\u0019Ho\u001c:bO\u0016L!\u0001Z1\u0003\u001d\tcwnY6NC:\fw-\u001a:JIB\u0019a-[6\u000e\u0003\u001dT!\u0001\u001b\u0016\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002kO\n\u00191+Z9\u0011\u000b%bg.\u001d;\n\u00055T#A\u0002+va2,7\u0007\u0005\u0002a_&\u0011\u0001/\u0019\u0002\b\u00052|7m[%e!\tI#/\u0003\u0002tU\t!Aj\u001c8h!\tIS/\u0003\u0002wU\t\u0019\u0011J\u001c;\u0002\u000f\r|g\u000e^3yiB\u0011\u0011P_\u0007\u0002?%\u00111p\b\u0002\f)\u0006\u001c8nQ8oi\u0016DH/A\u0006sK\u0006$W*\u001a;sS\u000e\u001c\bCA\u0018\u007f\u0013\ty\bG\u0001\u000eTQV4g\r\\3SK\u0006$W*\u001a;sS\u000e\u001c(+\u001a9peR,'/A\ttKJL\u0017\r\\5{KJl\u0015M\\1hKJ\u0004B!!\u0002\u0002\f5\u0011\u0011q\u0001\u0006\u0004\u0003\u0013y\u0012AC:fe&\fG.\u001b>fe&!\u0011QBA\u0004\u0005E\u0019VM]5bY&TXM]'b]\u0006<WM]\u0001\rE2|7m['b]\u0006<WM\u001d\t\u0004A\u0006M\u0011bAA\u000bC\na!\t\\8dW6\u000bg.Y4fe\u0006\u0001R.\u00199PkR\u0004X\u000f\u001e+sC\u000e\\WM\u001d\t\u0004s\u0006m\u0011bAA\u000f?\t\u0001R*\u00199PkR\u0004X\u000f\u001e+sC\u000e\\WM]\u0001\u0011g\"|W\u000f\u001c3CCR\u001c\u0007NR3uG\"\u00042!KA\u0012\u0013\r\t)C\u000b\u0002\b\u0005>|G.Z1o\u0003\u0019a\u0014N\\5u}Q\u0011\u00121FA\u0018\u0003s\tY$!\u0010\u0002@\u0005\u0005\u00131IA#!\u0015\ti\u0003A\u001a?\u001b\u00059\u0002BB$\n\u0001\u0004\t\t\u0004\r\u0003\u00024\u0005]\u0002CB\u0018Kg\u0005Ub\bE\u00025\u0003o!!BTA\u0018\u0003\u0003\u0005\tQ!\u00018\u0011\u0015y\u0015\u00021\u0001Q\u0011\u00159\u0018\u00021\u0001y\u0011\u0015a\u0018\u00021\u0001~\u0011%\t\t!\u0003I\u0001\u0002\u0004\t\u0019\u0001C\u0005\u0002\u0010%\u0001\n\u00111\u0001\u0002\u0012!I\u0011qC\u0005\u0011\u0002\u0003\u0007\u0011\u0011\u0004\u0005\n\u0003?I\u0001\u0013!a\u0001\u0003C\t1\u0001Z3q+\t\tY\u0005\r\u0005\u0002N\u0005U\u0013QLA2!)\ti#a\u0014\u0002T\u0005m\u0013\u0011M\u0005\u0004\u0003#:\"AF\"p[\u0016$8\u000b[;gM2,G)\u001a9f]\u0012,gnY=\u0011\u0007Q\n)\u0006\u0002\u0006\u0002X-\t\t\u0011!A\u0003\u0002]\u00121a\u0018\u00133\u0003\u0011!W\r\u001d\u0011\u0011\u0007Q\ni\u0006\u0002\u0006\u0002`-\t\t\u0011!A\u0003\u0002]\u00121a\u0018\u00134!\r!\u00141\r\u0003\u000b\u0003KZ\u0011\u0011!A\u0001\u0006\u00039$aA0%i\u0005ia-\u001a;dQ&#XM]1u_J,\"!a\u001b\u0011\tEK\u0016Q\u000e\t\u0006Sus\u0017q\u000e\t\u0005\u0003c\nY(\u0004\u0002\u0002t)!\u0011QOA<\u0003\tIwN\u0003\u0002\u0002z\u0005!!.\u0019<b\u0013\u0011\ti(a\u001d\u0003\u0017%s\u0007/\u001e;TiJ,\u0017-\\\u0001\u0005e\u0016\fG\r\u0006\u0002\u0002\u0004B!\u0011+WAC!\u0015I\u0013qQ\u001a?\u0013\r\tII\u000b\u0002\t!J|G-^2ue\u0005ab-\u001a;dQ\u000e{g\u000e^5ok>,8O\u00117pG.\u001c\u0018J\u001c\"bi\u000eDWCAA\u0011\u0003q\u0019u.\\3u\u00052|7m[*u_J,7\u000b[;gM2,'+Z1eKJ\u00042!!\f\u0011'\t\u0001\u0002\u0006\u0006\u0002\u0002\u0012\u0006YB\u0005\\3tg&t\u0017\u000e\u001e\u0013he\u0016\fG/\u001a:%I\u00164\u0017-\u001e7uIU*b!a'\u00022\u0006MVCAAOU\u0011\t\u0019!a(,\u0005\u0005\u0005\u0006\u0003BAR\u0003[k!!!*\u000b\t\u0005\u001d\u0016\u0011V\u0001\nk:\u001c\u0007.Z2lK\u0012T1!a++\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0003_\u000b)KA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016$QA\u000e\nC\u0002]\"Q\u0001\u0011\nC\u0002]\n1\u0004\n7fgNLg.\u001b;%OJ,\u0017\r^3sI\u0011,g-Y;mi\u00122TCBA]\u0003{\u000by,\u0006\u0002\u0002<*\"\u0011\u0011CAP\t\u001514C1\u00018\t\u0015\u00015C1\u00018\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%oU1\u0011QYAe\u0003\u0017,\"!a2+\t\u0005e\u0011q\u0014\u0003\u0006mQ\u0011\ra\u000e\u0003\u0006\u0001R\u0011\raN\u0001\u001cI1,7o]5oSR$sM]3bi\u0016\u0014H\u0005Z3gCVdG\u000f\n\u001d\u0016\r\u0005E\u0017Q[Al+\t\t\u0019N\u000b\u0003\u0002\"\u0005}E!\u0002\u001c\u0016\u0005\u00049D!\u0002!\u0016\u0005\u00049\u0004")
/* loaded from: input_file:org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.class */
public class CometBlockStoreShuffleReader<K, C> implements ShuffleReader<K, C>, Logging {
    private final Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>> blocksByAddress;
    private final TaskContext context;
    private final ShuffleReadMetricsReporter readMetrics;
    private final SerializerManager serializerManager;
    private final BlockManager blockManager;
    private final MapOutputTracker mapOutputTracker;
    private final boolean shouldBatchFetch;
    private final CometShuffleDependency<?, ?, ?> dep;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private CometShuffleDependency<?, ?, ?> dep() {
        return this.dep;
    }

    private Iterator<Tuple2<BlockId, InputStream>> fetchIterator() {
        return new ShuffleBlockFetcherIterator(this.context, this.blockManager.blockStoreClient(), this.blockManager, this.mapOutputTracker, this.blocksByAddress.map(tuple2 -> {
            return new Tuple2(tuple2._1(), ((SeqLike) tuple2._2()).toSeq());
        }), (blockId, inputStream) -> {
            ShuffleType shuffleType = this.dep().shuffleType();
            CometColumnarShuffle$ cometColumnarShuffle$ = CometColumnarShuffle$.MODULE$;
            return (shuffleType != null ? !shuffleType.equals(cometColumnarShuffle$) : cometColumnarShuffle$ != null) ? inputStream : this.serializerManager.wrapForEncryption(inputStream);
        }, BoxesRunTime.unboxToLong(SparkEnv$.MODULE$.get().conf().get(package$.MODULE$.REDUCER_MAX_SIZE_IN_FLIGHT())) * SegmentRoundingPolicy.MIN_SEGMENT_SIZE * SegmentRoundingPolicy.MIN_SEGMENT_SIZE, BoxesRunTime.unboxToInt(SparkEnv$.MODULE$.get().conf().get(package$.MODULE$.REDUCER_MAX_REQS_IN_FLIGHT())), BoxesRunTime.unboxToInt(SparkEnv$.MODULE$.get().conf().get(package$.MODULE$.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS())), BoxesRunTime.unboxToLong(SparkEnv$.MODULE$.get().conf().get(package$.MODULE$.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM())), BoxesRunTime.unboxToInt(SparkEnv$.MODULE$.get().conf().get(package$.MODULE$.SHUFFLE_MAX_ATTEMPTS_ON_NETTY_OOM())), BoxesRunTime.unboxToBoolean(SparkEnv$.MODULE$.get().conf().get(package$.MODULE$.SHUFFLE_DETECT_CORRUPT())), BoxesRunTime.unboxToBoolean(SparkEnv$.MODULE$.get().conf().get(package$.MODULE$.SHUFFLE_DETECT_CORRUPT_MEMORY())), BoxesRunTime.unboxToBoolean(SparkEnv$.MODULE$.get().conf().get(package$.MODULE$.SHUFFLE_CHECKSUM_ENABLED())), (String) SparkEnv$.MODULE$.get().conf().get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM()), this.readMetrics, fetchContinuousBlocksInBatch(), ShuffleBlockFetcherIterator$.MODULE$.$lessinit$greater$default$18()).toCompletionIterator();
    }

    public Iterator<Product2<K, C>> read() {
        ObjectRef create = ObjectRef.create((Object) null);
        this.context.addTaskCompletionListener(taskContext -> {
            $anonfun$read$1(create, taskContext);
            return BoxedUnit.UNIT;
        });
        InterruptibleIterator interruptibleIterator = new InterruptibleIterator(this.context, CompletionIterator$.MODULE$.apply(fetchIterator().flatMap(tuple2 -> {
            if (((NativeBatchDecoderIterator) create.elem) != null) {
                ((NativeBatchDecoderIterator) create.elem).close();
            }
            create.elem = new NativeBatchDecoderIterator((InputStream) tuple2._2(), this.context, this.dep().decodeTime());
            return (NativeBatchDecoderIterator) create.elem;
        }).map(columnarBatch -> {
            return new Tuple2(BoxesRunTime.boxToInteger(0), columnarBatch);
        }).map(tuple22 -> {
            this.readMetrics.incRecordsRead(((ColumnarBatch) tuple22._2()).numRows());
            return tuple22;
        }), () -> {
            this.context.taskMetrics().mergeShuffleReadMetrics();
        }));
        if (dep().aggregator().isDefined()) {
            throw new UnsupportedOperationException("aggregate not allowed");
        }
        Some keyOrdering = dep().keyOrdering();
        if ((keyOrdering instanceof Some) && (keyOrdering.value() instanceof Ordering)) {
            throw new UnsupportedOperationException("order not allowed");
        }
        if (None$.MODULE$.equals(keyOrdering)) {
            return interruptibleIterator instanceof InterruptibleIterator ? interruptibleIterator : new InterruptibleIterator(this.context, interruptibleIterator);
        }
        throw new MatchError(keyOrdering);
    }

    private boolean fetchContinuousBlocksInBatch() {
        SparkConf conf = SparkEnv$.MODULE$.get().conf();
        boolean supportsRelocationOfSerializedObjects = dep().serializer().supportsRelocationOfSerializedObjects();
        boolean unboxToBoolean = BoxesRunTime.unboxToBoolean(conf.get(package$.MODULE$.SHUFFLE_COMPRESS()));
        boolean supportsConcatenationOfSerializedStreams = unboxToBoolean ? CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(CompressionCodec$.MODULE$.createCodec(conf)) : true;
        boolean unboxToBoolean2 = BoxesRunTime.unboxToBoolean(conf.get(package$.MODULE$.SHUFFLE_USE_OLD_FETCH_PROTOCOL()));
        boolean unboxToBoolean3 = BoxesRunTime.unboxToBoolean(conf.get(package$.MODULE$.IO_ENCRYPTION_ENABLED()));
        boolean z = this.shouldBatchFetch && supportsRelocationOfSerializedObjects && !((unboxToBoolean && !supportsConcatenationOfSerializedStreams) || unboxToBoolean2 || unboxToBoolean3);
        if (this.shouldBatchFetch && !z) {
            logDebug(() -> {
                return new StringBuilder(266).append("The feature tag of continuous shuffle block fetching is set to true, but we can not enable the feature because other conditions are not satisfied. ").append("Shuffle compress: ").append(unboxToBoolean).append(", serializer relocatable: ").append(supportsRelocationOfSerializedObjects).append(", ").append("codec concatenation: ").append(supportsConcatenationOfSerializedStreams).append(", use old shuffle fetch protocol: ").append(unboxToBoolean2).append(", io encryption: ").append(unboxToBoolean3).append(".").toString();
            });
        }
        return z;
    }

    public static final /* synthetic */ void $anonfun$read$1(ObjectRef objectRef, TaskContext taskContext) {
        if (((NativeBatchDecoderIterator) objectRef.elem) != null) {
            ((NativeBatchDecoderIterator) objectRef.elem).close();
        }
    }

    public CometBlockStoreShuffleReader(BaseShuffleHandle<K, ?, C> baseShuffleHandle, Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>> iterator, TaskContext taskContext, ShuffleReadMetricsReporter shuffleReadMetricsReporter, SerializerManager serializerManager, BlockManager blockManager, MapOutputTracker mapOutputTracker, boolean z) {
        this.blocksByAddress = iterator;
        this.context = taskContext;
        this.readMetrics = shuffleReadMetricsReporter;
        this.serializerManager = serializerManager;
        this.blockManager = blockManager;
        this.mapOutputTracker = mapOutputTracker;
        this.shouldBatchFetch = z;
        Logging.$init$(this);
        this.dep = (CometShuffleDependency) baseShuffleHandle.dependency();
    }
}
