package io.datarouter.plugin.copytable.tableprocessor;

import io.datarouter.instrumentation.count.Counters;
import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.plugin.copytable.CopyTableService;
import io.datarouter.plugin.copytable.config.DatarouterCopyTableExecutors;
import io.datarouter.scanner.ParallelScannerContext;
import io.datarouter.scanner.Scanner;
import io.datarouter.storage.config.Config;
import io.datarouter.storage.node.DatarouterNodes;
import io.datarouter.storage.node.op.combo.SortedMapStorage;
import io.datarouter.storage.util.PrimaryKeyPercentCodecTool;
import io.datarouter.util.collection.ListTool;
import io.datarouter.util.number.NumberFormatter;
import io.datarouter.util.tuple.Range;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/datarouter/plugin/copytable/tableprocessor/TableProcessorService.class */
public class TableProcessorService {
    private static final Logger logger = LoggerFactory.getLogger(CopyTableService.class);
    private static final Config SCAN_CONFIG = new Config().setOutputBatchSize(1000);

    @Inject
    private DatarouterNodes nodes;

    @Inject
    private TableProcessorScannerContext scannerContext;

    @Singleton
    /* loaded from: input_file:io/datarouter/plugin/copytable/tableprocessor/TableProcessorService$TableProcessorScannerContext.class */
    public static class TableProcessorScannerContext {

        @Inject
        private DatarouterCopyTableExecutors.DatarouterTableProcessorExecutor executor;

        public ParallelScannerContext get(int i) {
            return new ParallelScannerContext(this.executor, i, false, i > 1);
        }
    }

    /* loaded from: input_file:io/datarouter/plugin/copytable/tableprocessor/TableProcessorService$TableProcessorSpanResult.class */
    public static class TableProcessorSpanResult {
        public final boolean success;
        public final Throwable exception;
        public final long numCopied;
        public final String resumeFromKeyString;

        public TableProcessorSpanResult(boolean z, Throwable th, long j, String str) {
            this.success = z;
            this.exception = th;
            this.numCopied = j;
            this.resumeFromKeyString = str;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <PK extends PrimaryKey<PK>, D extends Databean<PK, D>> TableProcessorSpanResult runTableProcessor(String str, String str2, String str3, TableProcessor<PK, D> tableProcessor, int i, int i2, long j, long j2) {
        SortedMapStorage.SortedMapStorageNode node = this.nodes.getNode(str);
        Objects.requireNonNull(node, String.valueOf(str) + " not found");
        PrimaryKey decode = PrimaryKeyPercentCodecTool.decode(node.getFieldInfo().getPrimaryKeySupplier(), str2);
        PrimaryKey decode2 = PrimaryKeyPercentCodecTool.decode(node.getFieldInfo().getPrimaryKeySupplier(), str3);
        Consumer consumer = tableProcessor.get();
        Range range = new Range(decode, false, decode2, true);
        AtomicLong atomicLong = new AtomicLong();
        AtomicLong atomicLong2 = new AtomicLong();
        AtomicReference atomicReference = new AtomicReference();
        try {
            node.scan(range, SCAN_CONFIG).each(databean -> {
                atomicLong.incrementAndGet();
            }).each(databean2 -> {
                Counters.inc("tableProcessor " + str + " read");
            }).batch(i2).parallel(this.scannerContext.get(i)).each(list -> {
                consumer.accept(Scanner.of(list));
            }).each(list2 -> {
                Counters.inc("tableProcessor " + str + " processed");
            }).each(list3 -> {
                atomicLong2.addAndGet(list3.size());
            }).each(list4 -> {
                atomicReference.set(((Databean) ListTool.getLast(list4)).getKey());
            }).sample(10L, true).forEach(list5 -> {
                logProgress(false, atomicLong.get(), atomicLong2.get(), j, j2, str, (PrimaryKey) atomicReference.get(), null);
            });
            logProgress(true, atomicLong.get(), atomicLong2.get(), j, j2, str, (PrimaryKey) atomicReference.get(), null);
            return new TableProcessorSpanResult(true, null, atomicLong2.get(), null);
        } catch (Throwable th) {
            PrimaryKey primaryKey = (PrimaryKey) atomicReference.get();
            logProgress(false, atomicLong.get(), atomicLong2.get(), j, j2, str, primaryKey, th);
            return new TableProcessorSpanResult(false, th, atomicLong2.get(), primaryKey == null ? null : PrimaryKeyPercentCodecTool.encode(primaryKey));
        }
    }

    private <PK extends PrimaryKey<PK>, D extends Databean<PK, D>> void logProgress(boolean z, long j, long j2, long j3, long j4, String str, PK pk, Throwable th) {
        String str2 = z ? "finished" : "intermediate";
        Logger logger2 = logger;
        Object[] objArr = new Object[8];
        objArr[0] = str2;
        objArr[1] = NumberFormatter.addCommas(Long.valueOf(j));
        objArr[2] = NumberFormatter.addCommas(Long.valueOf(j2));
        objArr[3] = NumberFormatter.addCommas(Long.valueOf(j3));
        objArr[4] = NumberFormatter.addCommas(Long.valueOf(j4));
        objArr[5] = str;
        objArr[6] = pk == null ? null : PrimaryKeyPercentCodecTool.encode(pk);
        objArr[7] = th;
        logger2.warn("{} scanned {} processed {} for batch {}/{} from {} through {}", objArr);
    }
}
