package org.tikv.common.operation.iterator;

import com.pingcap.tidb.tipb.DAGRequest;
import com.pingcap.tidb.tipb.EncodeType;
import com.pingcap.tidb.tipb.SelectResponse;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorCompletionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiSession;
import org.tikv.common.exception.RegionTaskException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.meta.TiDAGRequest;
import org.tikv.common.operation.SchemaInfer;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiStoreType;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.RangeSplitter;
import org.tikv.kvproto.Coprocessor;

/* loaded from: input_file:org/tikv/common/operation/iterator/DAGIterator.class */
public abstract class DAGIterator<T> extends CoprocessorIterator<T> {
    private static final Logger logger = LoggerFactory.getLogger(DAGIterator.class.getName());
    private final TiDAGRequest.PushDownType pushDownType;
    private final TiStoreType storeType;
    private final long startTs;
    protected EncodeType encodeType;
    private ExecutorCompletionService<Iterator<SelectResponse>> streamingService;
    private ExecutorCompletionService<SelectResponse> dagService;
    private SelectResponse response;
    private Iterator<SelectResponse> responseIterator;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DAGIterator(DAGRequest dAGRequest, List<RangeSplitter.RegionTask> list, TiSession tiSession, SchemaInfer schemaInfer, TiDAGRequest.PushDownType pushDownType, TiStoreType tiStoreType, long j) {
        super(dAGRequest, list, tiSession, schemaInfer);
        this.pushDownType = pushDownType;
        this.storeType = tiStoreType;
        this.startTs = j;
        switch (pushDownType) {
            case NORMAL:
                this.dagService = new ExecutorCompletionService<>(tiSession.getThreadPoolForTableScan());
                break;
            case STREAMING:
                this.streamingService = new ExecutorCompletionService<>(tiSession.getThreadPoolForTableScan());
                break;
        }
        submitTasks();
    }

    @Override // org.tikv.common.operation.iterator.CoprocessorIterator
    void submitTasks() {
        for (RangeSplitter.RegionTask regionTask : this.regionTasks) {
            switch (this.pushDownType) {
                case NORMAL:
                    this.dagService.submit(() -> {
                        return process(regionTask);
                    });
                    break;
                case STREAMING:
                    this.streamingService.submit(() -> {
                        return processByStreaming(regionTask);
                    });
                    break;
            }
        }
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        if (this.eof) {
            return false;
        }
        while (true) {
            if (this.chunkList != null && this.chunkIndex < this.chunkList.size() && this.dataInput.available() > 0) {
                return true;
            }
            if (tryAdvanceChunkIndex()) {
                createDataInputReader();
            } else if (this.pushDownType == TiDAGRequest.PushDownType.STREAMING) {
                if (!advanceNextResponse() && !readNextRegionChunks()) {
                    return false;
                }
            } else if (!readNextRegionChunks()) {
                return false;
            }
        }
    }

    private boolean hasMoreResponse() {
        switch (this.pushDownType) {
            case NORMAL:
                return this.response != null;
            case STREAMING:
                return this.responseIterator != null && this.responseIterator.hasNext();
            default:
                throw new IllegalArgumentException("Invalid push down type:" + this.pushDownType);
        }
    }

    private boolean advanceNextResponse() {
        if (!hasMoreResponse()) {
            return false;
        }
        switch (this.pushDownType) {
            case NORMAL:
                this.chunkList = this.response.getChunksList();
                this.encodeType = this.response.getEncodeType();
                break;
            case STREAMING:
                SelectResponse next = this.responseIterator.next();
                this.chunkList = next.getChunksList();
                this.encodeType = next.getEncodeType();
                break;
        }
        if (this.chunkList == null || this.chunkList.isEmpty()) {
            return false;
        }
        this.chunkIndex = 0;
        createDataInputReader();
        return true;
    }

    private boolean readNextRegionChunks() {
        while (hasNextRegionTask()) {
            if (doReadNextRegionChunks()) {
                return true;
            }
        }
        return false;
    }

    private boolean hasNextRegionTask() {
        return (this.eof || this.regionTasks == null || this.taskIndex >= this.regionTasks.size()) ? false : true;
    }

    private boolean doReadNextRegionChunks() {
        try {
            switch (this.pushDownType) {
                case NORMAL:
                    this.response = this.dagService.take().get();
                    break;
                case STREAMING:
                    this.responseIterator = this.streamingService.take().get();
                    break;
            }
            this.taskIndex++;
            return advanceNextResponse();
        } catch (Exception e) {
            throw new TiClientInternalException("Error reading region:", e);
        }
    }

    private SelectResponse process(RangeSplitter.RegionTask regionTask) {
        ArrayDeque arrayDeque = new ArrayDeque();
        ArrayDeque arrayDeque2 = new ArrayDeque();
        arrayDeque.add(regionTask);
        ConcreteBackOffer newCopNextMaxBackOff = ConcreteBackOffer.newCopNextMaxBackOff();
        HashSet hashSet = new HashSet();
        while (!arrayDeque.isEmpty()) {
            RangeSplitter.RegionTask regionTask2 = (RangeSplitter.RegionTask) arrayDeque.poll();
            if (regionTask2 != null) {
                List<Coprocessor.KeyRange> ranges = regionTask2.getRanges();
                try {
                    RegionStoreClient build = this.session.getRegionStoreClientBuilder().build(regionTask2.getRegion(), regionTask2.getStore(), this.storeType);
                    build.addResolvedLocks(Long.valueOf(this.startTs), hashSet);
                    List<RangeSplitter.RegionTask> coprocess = build.coprocess(newCopNextMaxBackOff, this.dagRequest, ranges, arrayDeque2, this.startTs);
                    if (coprocess != null) {
                        arrayDeque.addAll(coprocess);
                    }
                    hashSet.addAll(build.getResolvedLocks(Long.valueOf(this.startTs)));
                } catch (Throwable th) {
                    logger.error("Process region tasks failed, remain " + arrayDeque.size() + " tasks not executed due to", th);
                    this.eof = true;
                    throw new RegionTaskException("Handle region task failed:", th);
                }
            }
        }
        ArrayList arrayList = new ArrayList();
        EncodeType encodeType = null;
        while (!arrayDeque2.isEmpty()) {
            SelectResponse selectResponse = (SelectResponse) arrayDeque2.poll();
            if (selectResponse != null) {
                encodeType = selectResponse.getEncodeType();
                arrayList.addAll(selectResponse.getChunksList());
            }
        }
        return SelectResponse.newBuilder().addAllChunks(arrayList).setEncodeType(encodeType).build();
    }

    private Iterator<SelectResponse> processByStreaming(RangeSplitter.RegionTask regionTask) {
        try {
            Iterator<SelectResponse> coprocessStreaming = this.session.getRegionStoreClientBuilder().build(regionTask.getRegion(), regionTask.getStore(), this.storeType).coprocessStreaming(this.dagRequest, regionTask.getRanges(), this.startTs);
            if (coprocessStreaming != null) {
                return coprocessStreaming;
            }
            this.eof = true;
            return null;
        } catch (Exception e) {
            throw new TiClientInternalException("Error Closing Store client.", e);
        }
    }
}
