package io.datarouter.filesystem.snapshot.writer;

import io.datarouter.bytes.EmptyArray;
import io.datarouter.bytes.codec.bytestringcodec.CsvIntByteStringCodec;
import io.datarouter.filesystem.snapshot.block.root.RootBlock;
import io.datarouter.filesystem.snapshot.encode.BranchBlockEncoder;
import io.datarouter.filesystem.snapshot.encode.LeafBlockEncoder;
import io.datarouter.filesystem.snapshot.encode.ValueBlockEncoder;
import io.datarouter.filesystem.snapshot.entry.SnapshotEntry;
import io.datarouter.filesystem.snapshot.key.SnapshotKey;
import io.datarouter.filesystem.snapshot.storage.block.SnapshotBlockStorage;
import io.datarouter.filesystem.snapshot.storage.file.SnapshotFileStorage;
import io.datarouter.scanner.Scanner;
import io.datarouter.util.concurrent.BlockingDequeTool;
import io.datarouter.util.concurrent.CountDownLatchTool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/filesystem/snapshot/writer/SnapshotWriter.class */
public class SnapshotWriter implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(SnapshotWriter.class);
    private final SnapshotKey snapshotKey;
    private final SnapshotWriterTracker tracker;
    private final SnapshotWriterConfig config;
    private final SnapshotBlockWriter blockWriter;
    private final Thread writerThread;
    private final long startTimeMs;
    private final LinkedBlockingDeque<Message> messages;
    private LeafBlockEncoder leafBlockEncoder;
    private long numKeys;
    private SnapshotEntry lastEntry;
    private final int numColumns;
    private long lastStatusLogMs = System.currentTimeMillis();
    private final CountDownLatch writerThreadCompletionLatch = new CountDownLatch(1);
    private final List<BranchBlockEncoder> branchBlockEncoders = new ArrayList();
    private final List<Integer> numBranchBlocksByLevel = new ArrayList();
    private int numLeafBlocks = 0;
    private final List<ValueBlockEncoder> valueBlockEncoders = new ArrayList();
    private final List<Integer> numValueBlocksByColumn = new ArrayList();
    private final List<Integer> numValuesInBlockByColumn = new ArrayList();

    /* loaded from: input_file:io/datarouter/filesystem/snapshot/writer/SnapshotWriter$Message.class */
    private static class Message {
        final List<SnapshotEntry> entries;
        final boolean isLast;

        Message(List<SnapshotEntry> list, boolean z) {
            this.entries = list;
            this.isLast = z;
        }

        static Message addBatch(List<SnapshotEntry> list) {
            return new Message(list, false);
        }

        static Message last() {
            return new Message(List.of(), true);
        }
    }

    public SnapshotWriter(SnapshotKey snapshotKey, SnapshotFileStorage snapshotFileStorage, SnapshotBlockStorage snapshotBlockStorage, SnapshotWriterConfig snapshotWriterConfig, ExecutorService executorService) {
        this.snapshotKey = snapshotKey;
        this.tracker = new SnapshotWriterTracker(snapshotKey);
        this.config = snapshotWriterConfig;
        this.blockWriter = new SnapshotBlockWriter(snapshotKey, this.tracker, snapshotBlockStorage, snapshotFileStorage, snapshotWriterConfig, executorService);
        this.messages = new LinkedBlockingDeque<>(snapshotWriterConfig.batchQueueLength());
        this.leafBlockEncoder = snapshotWriterConfig.leafBlockEncoderSupplier().get();
        this.numColumns = snapshotWriterConfig.numColumns();
        IntStream.range(0, this.numColumns).forEach(i -> {
            this.valueBlockEncoders.add(snapshotWriterConfig.valueBlockEncoderSupplier().get());
            this.numValueBlocksByColumn.add(0);
            this.numValuesInBlockByColumn.add(0);
        });
        this.writerThread = startWriterThread();
        this.startTimeMs = System.currentTimeMillis();
    }

    private Thread startWriterThread() {
        Thread thread = new Thread(() -> {
            Message message;
            do {
                long nanoTime = System.nanoTime();
                message = (Message) BlockingDequeTool.pollForever(this.messages);
                this.tracker.readStallNs.incrementBy(System.nanoTime() - nanoTime);
                message.entries.forEach(this::add);
                this.tracker.entriesQueued.decrementBySize(message.entries);
                this.tracker.entriesProcessed.incrementBySize(message.entries);
            } while (!message.isLast);
            this.writerThreadCompletionLatch.countDown();
        }, String.join("-", getClass().getSimpleName(), this.snapshotKey.toString()));
        thread.start();
        return thread;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.writerThread.interrupt();
    }

    public void addBatch(List<SnapshotEntry> list) {
        BlockingDequeTool.put(this.messages, Message.addBatch(list));
        this.tracker.entriesQueued.incrementBySize(list);
        logStatusOccasional();
    }

    private void add(SnapshotEntry snapshotEntry) {
        int[] iArr;
        int[] iArr2;
        if (this.numColumns != snapshotEntry.columnValues.length) {
            throw new IllegalArgumentException(String.format("Expected %s values but found %s", Integer.valueOf(this.numColumns), Integer.valueOf(snapshotEntry.columnValues.length)));
        }
        if (this.config.sorted() && this.lastEntry != null && this.leafBlockEncoder.numRecords() == 0 && Arrays.compareUnsigned(snapshotEntry.keySlab(), snapshotEntry.keyFrom(), snapshotEntry.keyTo(), this.lastEntry.keySlab(), this.lastEntry.keyFrom(), this.lastEntry.keyTo()) <= 0) {
            throw new IllegalArgumentException(String.format("key=[%s] must sort after lastKey=[%s]", CsvIntByteStringCodec.INSTANCE.encode(snapshotEntry.key()), CsvIntByteStringCodec.INSTANCE.encode(this.lastEntry.key())));
        }
        long j = this.numKeys;
        if (this.numColumns == 0) {
            iArr = EmptyArray.INT;
            iArr2 = EmptyArray.INT;
        } else {
            iArr = new int[this.numColumns];
            iArr2 = new int[this.numColumns];
            for (int i = 0; i < this.numColumns; i++) {
                iArr[i] = this.numValueBlocksByColumn.get(i).intValue();
                iArr2[i] = this.numValuesInBlockByColumn.get(i).intValue();
            }
        }
        this.leafBlockEncoder.add(this.numLeafBlocks, j, snapshotEntry, iArr, iArr2);
        if (this.leafBlockEncoder.numBytes() >= this.config.leafBlockSize()) {
            addBranchEntry(0, j, snapshotEntry, this.numLeafBlocks);
            this.blockWriter.submitLeaf(this.leafBlockEncoder);
            this.leafBlockEncoder = this.config.leafBlockEncoderSupplier().get();
            this.numLeafBlocks++;
        }
        for (int i2 = 0; i2 < this.numColumns; i2++) {
            ValueBlockEncoder valueBlockEncoder = this.valueBlockEncoders.get(i2);
            valueBlockEncoder.add(snapshotEntry, i2);
            this.numValuesInBlockByColumn.set(i2, Integer.valueOf(this.numValuesInBlockByColumn.get(i2).intValue() + 1));
            if (valueBlockEncoder.numBytes() >= this.config.valueBlockSize()) {
                this.blockWriter.submitValueBlock(i2, this.numValueBlocksByColumn.get(i2).intValue(), valueBlockEncoder);
                this.valueBlockEncoders.set(i2, this.config.valueBlockEncoderSupplier().get());
                this.numValueBlocksByColumn.set(i2, Integer.valueOf(this.numValueBlocksByColumn.get(i2).intValue() + 1));
                this.numValuesInBlockByColumn.set(i2, 0);
            }
        }
        this.numKeys++;
        this.lastEntry = snapshotEntry;
    }

    private void addBranchEntry(int i, long j, SnapshotEntry snapshotEntry, int i2) {
        if (i > this.branchBlockEncoders.size() - 1) {
            this.branchBlockEncoders.add(this.config.branchBlockEncoderFactory().apply(Integer.valueOf(i)));
            this.numBranchBlocksByLevel.add(0);
        }
        BranchBlockEncoder branchBlockEncoder = this.branchBlockEncoders.get(i);
        branchBlockEncoder.add(this.numBranchBlocksByLevel.get(i).intValue(), j, snapshotEntry, i2);
        if (branchBlockEncoder.numBytes() >= this.config.branchBlockSize()) {
            addBranchEntry(i + 1, j, snapshotEntry, this.numBranchBlocksByLevel.get(i).intValue());
            this.blockWriter.submitBranch(branchBlockEncoder);
            this.branchBlockEncoders.set(i, this.config.branchBlockEncoderFactory().apply(Integer.valueOf(i)));
            this.numBranchBlocksByLevel.set(i, Integer.valueOf(this.numBranchBlocksByLevel.get(i).intValue() + 1));
        }
    }

    public Optional<RootBlock> complete() {
        BlockingDequeTool.put(this.messages, Message.last());
        CountDownLatchTool.await(this.writerThreadCompletionLatch);
        IntStream.range(0, this.valueBlockEncoders.size()).forEach(i -> {
            ValueBlockEncoder valueBlockEncoder = this.valueBlockEncoders.get(i);
            if (valueBlockEncoder.numRecords() > 0) {
                this.blockWriter.submitValueBlock(i, this.numValueBlocksByColumn.get(i).intValue(), valueBlockEncoder);
                this.numValueBlocksByColumn.set(i, Integer.valueOf(this.numValueBlocksByColumn.get(i).intValue() + 1));
            }
        });
        if (this.leafBlockEncoder.numRecords() > 0) {
            addBranchEntry(0, this.numKeys, this.lastEntry, this.numLeafBlocks);
            this.blockWriter.submitLeaf(this.leafBlockEncoder);
            this.numLeafBlocks++;
        }
        IntStream.range(0, this.branchBlockEncoders.size()).forEach(i2 -> {
            BranchBlockEncoder branchBlockEncoder = this.branchBlockEncoders.get(i2);
            if (branchBlockEncoder.numRecords() > 0) {
                if (i2 != this.branchBlockEncoders.size() - 1) {
                    addBranchEntry(i2 + 1, this.numKeys, this.lastEntry, this.numBranchBlocksByLevel.get(i2).intValue());
                }
                this.blockWriter.submitBranch(branchBlockEncoder);
                this.branchBlockEncoders.set(i2, this.config.branchBlockEncoderFactory().apply(Integer.valueOf(i2)));
                this.numBranchBlocksByLevel.set(i2, Integer.valueOf(this.numBranchBlocksByLevel.get(i2).intValue() + 1));
            }
        });
        this.blockWriter.complete();
        if (this.numKeys == 0) {
            return Optional.empty();
        }
        RootBlock flushRootBlock = this.blockWriter.flushRootBlock(this.startTimeMs, this.numBranchBlocksByLevel, this.numValueBlocksByColumn, this.branchBlockEncoders.size(), this.numKeys, this.numLeafBlocks);
        logStatus();
        logger.warn("Completed group={}, id={}, {}", new Object[]{this.snapshotKey.groupId(), this.snapshotKey.snapshotId(), (String) Scanner.of(flushRootBlock.toKeyValueStrings().entrySet()).map(entry -> {
            return String.valueOf((String) entry.getKey()) + "=" + ((String) entry.getValue());
        }).collect(Collectors.joining(", "))});
        return Optional.of(flushRootBlock);
    }

    private void logStatusOccasional() {
        if (System.currentTimeMillis() - this.lastStatusLogMs > this.config.logPeriodMs()) {
            logStatus();
        }
    }

    private void logStatus() {
        logger.warn("{}", this.tracker.toLog(System.currentTimeMillis() - this.startTimeMs));
        this.lastStatusLogMs = System.currentTimeMillis();
    }
}
