package io.datarouter.snapshotmanager;

import io.datarouter.filesystem.snapshot.group.SnapshotGroup;
import io.datarouter.filesystem.snapshot.group.dto.SnapshotKeyAndNumRecords;
import io.datarouter.filesystem.snapshot.group.dto.SnapshotWriteResult;
import io.datarouter.filesystem.snapshot.key.SnapshotKey;
import io.datarouter.filesystem.snapshot.reader.ScanningSnapshotReader;
import io.datarouter.filesystem.snapshot.reader.record.SnapshotLeafRecord;
import io.datarouter.filesystem.snapshot.writer.SnapshotWriterConfig;
import io.datarouter.scanner.Scanner;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/snapshotmanager/SnapshotMerger.class */
public class SnapshotMerger {
    private static final Logger logger = LoggerFactory.getLogger(SnapshotMerger.class);
    private final SnapshotMergerParams params;

    /* loaded from: input_file:io/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams.class */
    public static final class SnapshotMergerParams extends Record {
        private final Supplier<Boolean> shouldStop;
        private final SnapshotGroup mergeGroup;
        private final SnapshotGroup destinationGroup;
        private final ExecutorService readExec;
        private final ExecutorService writeExec;
        private final SnapshotWriterConfig mergeWriterConfig;
        private final SnapshotWriterConfig destinationWriterConfig;
        private final int prefetchThreads;
        private final int prefetchBlocks;
        private final int mergeFactor;

        public SnapshotMergerParams(Supplier<Boolean> supplier, SnapshotGroup snapshotGroup, SnapshotGroup snapshotGroup2, ExecutorService executorService, ExecutorService executorService2, SnapshotWriterConfig snapshotWriterConfig, SnapshotWriterConfig snapshotWriterConfig2, int i, int i2, int i3) {
            this.shouldStop = supplier;
            this.mergeGroup = snapshotGroup;
            this.destinationGroup = snapshotGroup2;
            this.readExec = executorService;
            this.writeExec = executorService2;
            this.mergeWriterConfig = snapshotWriterConfig;
            this.destinationWriterConfig = snapshotWriterConfig2;
            this.prefetchThreads = i;
            this.prefetchBlocks = i2;
            this.mergeFactor = i3;
        }

        public Supplier<Boolean> shouldStop() {
            return this.shouldStop;
        }

        public SnapshotGroup mergeGroup() {
            return this.mergeGroup;
        }

        public SnapshotGroup destinationGroup() {
            return this.destinationGroup;
        }

        public ExecutorService readExec() {
            return this.readExec;
        }

        public ExecutorService writeExec() {
            return this.writeExec;
        }

        public SnapshotWriterConfig mergeWriterConfig() {
            return this.mergeWriterConfig;
        }

        public SnapshotWriterConfig destinationWriterConfig() {
            return this.destinationWriterConfig;
        }

        public int prefetchThreads() {
            return this.prefetchThreads;
        }

        public int prefetchBlocks() {
            return this.prefetchBlocks;
        }

        public int mergeFactor() {
            return this.mergeFactor;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, SnapshotMergerParams.class), SnapshotMergerParams.class, "shouldStop;mergeGroup;destinationGroup;readExec;writeExec;mergeWriterConfig;destinationWriterConfig;prefetchThreads;prefetchBlocks;mergeFactor", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->shouldStop:Ljava/util/function/Supplier;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->mergeGroup:Lio/datarouter/filesystem/snapshot/group/SnapshotGroup;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->destinationGroup:Lio/datarouter/filesystem/snapshot/group/SnapshotGroup;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->readExec:Ljava/util/concurrent/ExecutorService;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->writeExec:Ljava/util/concurrent/ExecutorService;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->mergeWriterConfig:Lio/datarouter/filesystem/snapshot/writer/SnapshotWriterConfig;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->destinationWriterConfig:Lio/datarouter/filesystem/snapshot/writer/SnapshotWriterConfig;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->prefetchThreads:I", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->prefetchBlocks:I", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->mergeFactor:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, SnapshotMergerParams.class), SnapshotMergerParams.class, "shouldStop;mergeGroup;destinationGroup;readExec;writeExec;mergeWriterConfig;destinationWriterConfig;prefetchThreads;prefetchBlocks;mergeFactor", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->shouldStop:Ljava/util/function/Supplier;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->mergeGroup:Lio/datarouter/filesystem/snapshot/group/SnapshotGroup;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->destinationGroup:Lio/datarouter/filesystem/snapshot/group/SnapshotGroup;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->readExec:Ljava/util/concurrent/ExecutorService;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->writeExec:Ljava/util/concurrent/ExecutorService;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->mergeWriterConfig:Lio/datarouter/filesystem/snapshot/writer/SnapshotWriterConfig;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->destinationWriterConfig:Lio/datarouter/filesystem/snapshot/writer/SnapshotWriterConfig;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->prefetchThreads:I", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->prefetchBlocks:I", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->mergeFactor:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, SnapshotMergerParams.class, Object.class), SnapshotMergerParams.class, "shouldStop;mergeGroup;destinationGroup;readExec;writeExec;mergeWriterConfig;destinationWriterConfig;prefetchThreads;prefetchBlocks;mergeFactor", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->shouldStop:Ljava/util/function/Supplier;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->mergeGroup:Lio/datarouter/filesystem/snapshot/group/SnapshotGroup;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->destinationGroup:Lio/datarouter/filesystem/snapshot/group/SnapshotGroup;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->readExec:Ljava/util/concurrent/ExecutorService;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->writeExec:Ljava/util/concurrent/ExecutorService;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->mergeWriterConfig:Lio/datarouter/filesystem/snapshot/writer/SnapshotWriterConfig;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->destinationWriterConfig:Lio/datarouter/filesystem/snapshot/writer/SnapshotWriterConfig;", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->prefetchThreads:I", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->prefetchBlocks:I", "FIELD:Lio/datarouter/snapshotmanager/SnapshotMerger$SnapshotMergerParams;->mergeFactor:I").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    public SnapshotMerger(SnapshotMergerParams snapshotMergerParams) {
        this.params = snapshotMergerParams;
    }

    public void merge() {
        Map map = this.params.mergeGroup.keyReadOps(false).scanSnapshotKeysAndRootBlocks(this.params.readExec, 10).map(SnapshotKeyAndNumRecords::new).toMap((v0) -> {
            return v0.key();
        });
        while (map.size() > 1) {
            boolean z = map.size() <= this.params.mergeFactor;
            SnapshotGroup snapshotGroup = z ? this.params.destinationGroup : this.params.mergeGroup;
            SnapshotWriterConfig snapshotWriterConfig = z ? this.params.destinationWriterConfig : this.params.mergeWriterConfig;
            Scanner flush = Scanner.of(map.values()).minN(SnapshotKeyAndNumRecords.BY_NUM_RECORDS, this.params.mergeFactor).map((v0) -> {
                return v0.key();
            }).flush(list -> {
                SnapshotWriteResult combineSnapshots = combineSnapshots(list, snapshotGroup, snapshotWriterConfig);
                map.put(combineSnapshots.key, new SnapshotKeyAndNumRecords(combineSnapshots.toSnapshotKeyAndRoot()));
            });
            map.getClass();
            flush.forEach((v1) -> {
                r1.remove(v1);
            });
        }
    }

    private SnapshotWriteResult combineSnapshots(List<SnapshotKey> list, SnapshotGroup snapshotGroup, SnapshotWriterConfig snapshotWriterConfig) {
        SnapshotWriteResult snapshotWriteResult = (SnapshotWriteResult) Scanner.of(list).map(snapshotKey -> {
            return new ScanningSnapshotReader(snapshotKey, this.params.readExec, this.params.prefetchThreads, this.params.mergeGroup, this.params.prefetchBlocks);
        }).collate(scanningSnapshotReader -> {
            return scanningSnapshotReader.scanLeafRecords(0L);
        }, SnapshotLeafRecord.KEY_COMPARATOR).deduplicateConsecutiveBy((v0) -> {
            return v0.key();
        }, Arrays::equals).map((v0) -> {
            return v0.entry();
        }).batch(10000).apply(scanner -> {
            return snapshotGroup.writeOps().write(snapshotWriterConfig, scanner, this.params.writeExec, this.params.shouldStop);
        });
        list.forEach(snapshotKey2 -> {
            this.params.mergeGroup.deleteOps().deleteSnapshot(snapshotKey2, this.params.writeExec, 10);
        });
        logger.warn("combined {}, {}", Integer.valueOf(list.size()), list);
        return snapshotWriteResult;
    }
}
