package crawlercommons.urlfrontier.service.rocksdb;

import com.google.protobuf.InvalidProtocolBufferException;
import crawlercommons.urlfrontier.CrawlID;
import crawlercommons.urlfrontier.Urlfrontier;
import crawlercommons.urlfrontier.service.AbstractFrontierService;
import crawlercommons.urlfrontier.service.QueueInterface;
import crawlercommons.urlfrontier.service.QueueWithinCrawl;
import io.grpc.stub.StreamObserver;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.DecimalFormat;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel;
import org.rocksdb.TableFormatConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:crawlercommons/urlfrontier/service/rocksdb/RocksDBService.class */
public class RocksDBService extends AbstractFrontierService implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) RocksDBService.class);
    private static final DecimalFormat DF = new DecimalFormat("0000000000");
    private RocksDB rocksDB;
    private final List<ColumnFamilyHandle> columnFamilyHandleList;
    private Statistics statistics;
    private final ConcurrentHashMap<QueueWithinCrawl, QueueWithinCrawl> queuesBeingDeleted;

    public RocksDBService() {
        this(new HashMap());
    }

    public RocksDBService(Map<String, String> map) {
        this.columnFamilyHandleList = new ArrayList();
        this.queuesBeingDeleted = new ConcurrentHashMap<>();
        String orDefault = map.getOrDefault("rocksdb.path", "./rocksdb");
        LOG.info("RocksDB data stored in {} ", orDefault);
        if (map.containsKey("rocksdb.purge")) {
            try {
                Files.walk(Paths.get(orDefault, new String[0]), new FileVisitOption[0]).sorted(Comparator.reverseOrder()).map((v0) -> {
                    return v0.toFile();
                }).forEach((v0) -> {
                    v0.delete();
                });
            } catch (IOException e) {
                LOG.error("Couldn't delete path {}", orDefault);
            }
        }
        if (map.containsKey("rocksdb.stats")) {
            this.statistics = new Statistics();
            this.statistics.setStatsLevel(StatsLevel.ALL);
        }
        boolean containsKey = map.containsKey("rocksdb.bloom.filters");
        ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions();
        Throwable th = null;
        try {
            String str = map.get("rocksdb.max_bytes_for_level_base");
            if (str != null) {
                columnFamilyOptions.setMaxBytesForLevelBase(Long.parseLong(str));
            }
            columnFamilyOptions.optimizeUniversalStyleCompaction();
            List asList = Arrays.asList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions), new ColumnFamilyDescriptor("queues".getBytes(), columnFamilyOptions));
            long currentTimeMillis = System.currentTimeMillis();
            if (containsKey) {
                LOG.info("Configuring Bloom filters");
                columnFamilyOptions.setTableFormatConfig((TableFormatConfig) new BlockBasedTableConfig().setFilterPolicy(new BloomFilter(10.0d, false)));
            }
            try {
                DBOptions dBOptions = new DBOptions();
                Throwable th2 = null;
                try {
                    dBOptions.setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
                    String str2 = map.get("rocksdb.max_background_jobs");
                    if (str2 != null) {
                        dBOptions.setMaxBackgroundJobs(Integer.parseInt(str2));
                    }
                    String str3 = map.get("rocksdb.max_subcompactions");
                    if (str3 != null) {
                        dBOptions.setMaxSubcompactions(Integer.parseInt(str3));
                    }
                    if (this.statistics != null) {
                        LOG.info("Allowing stats from RocksDB to be displayed when GetStats is called");
                        dBOptions.setStatistics(this.statistics);
                    }
                    this.rocksDB = RocksDB.open(dBOptions, orDefault, (List<ColumnFamilyDescriptor>) asList, this.columnFamilyHandleList);
                    if (dBOptions != null) {
                        if (0 != 0) {
                            try {
                                dBOptions.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            dBOptions.close();
                        }
                    }
                    long currentTimeMillis2 = System.currentTimeMillis();
                    LOG.info("RocksDB loaded in {} msec", Long.valueOf(currentTimeMillis2 - currentTimeMillis));
                    LOG.info("Scanning tables to rebuild queues... (can take a long time)");
                    recoveryQscan();
                    LOG.info("{} queues discovered in {} msec", Integer.valueOf(this.queues.size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
                    if (columnFamilyOptions != null) {
                        if (0 == 0) {
                            columnFamilyOptions.close();
                            return;
                        }
                        try {
                            columnFamilyOptions.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    if (dBOptions != null) {
                        if (0 != 0) {
                            try {
                                dBOptions.close();
                            } catch (Throwable th6) {
                                th2.addSuppressed(th6);
                            }
                        } else {
                            dBOptions.close();
                        }
                    }
                    throw th5;
                }
            } catch (RocksDBException e2) {
                LOG.error("RocksDB exception ", (Throwable) e2);
                throw new RuntimeException(e2);
            }
        } catch (Throwable th7) {
            if (columnFamilyOptions != null) {
                if (0 != 0) {
                    try {
                        columnFamilyOptions.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    columnFamilyOptions.close();
                }
            }
            throw th7;
        }
    }

    private void recoveryQscan() {
        LOG.info("Recovering queues from existing RocksDB");
        RocksIterator newIterator = this.rocksDB.newIterator(this.columnFamilyHandleList.get(1));
        Throwable th = null;
        try {
            newIterator.seekToFirst();
            while (newIterator.isValid()) {
                ((QueueMetadata) this.queues.computeIfAbsent(parseAndDeNormalise(new String(newIterator.key(), StandardCharsets.UTF_8)), queueWithinCrawl -> {
                    return new QueueMetadata();
                })).incrementActive();
                newIterator.next();
            }
            QueueWithinCrawl queueWithinCrawl2 = null;
            long j = 0;
            RocksIterator newIterator2 = this.rocksDB.newIterator(this.columnFamilyHandleList.get(0));
            Throwable th2 = null;
            try {
                try {
                    newIterator2.seekToFirst();
                    while (newIterator2.isValid()) {
                        QueueWithinCrawl parseAndDeNormalise = parseAndDeNormalise(new String(newIterator2.key(), StandardCharsets.UTF_8));
                        if (queueWithinCrawl2 == null) {
                            queueWithinCrawl2 = parseAndDeNormalise;
                        } else if (!queueWithinCrawl2.equals(parseAndDeNormalise)) {
                            if (this.queues.get(queueWithinCrawl2).countActive() != j) {
                                throw new RuntimeException("Incorrect number of active URLs for queue " + queueWithinCrawl2);
                            }
                            queueWithinCrawl2 = parseAndDeNormalise;
                            j = 0;
                        }
                        QueueMetadata queueMetadata = (QueueMetadata) this.queues.computeIfAbsent(parseAndDeNormalise, queueWithinCrawl3 -> {
                            return new QueueMetadata();
                        });
                        if (newIterator2.value().length == 0) {
                            queueMetadata.incrementCompleted();
                        } else {
                            j++;
                        }
                        newIterator2.next();
                    }
                    if (newIterator2 != null) {
                        if (0 != 0) {
                            try {
                                newIterator2.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            newIterator2.close();
                        }
                    }
                    if (queueWithinCrawl2 != null && this.queues.get(queueWithinCrawl2).countActive() != j) {
                        throw new RuntimeException("Incorrect number of active URLs for queue " + queueWithinCrawl2);
                    }
                } catch (Throwable th4) {
                    th2 = th4;
                    throw th4;
                }
            } catch (Throwable th5) {
                if (newIterator2 != null) {
                    if (th2 != null) {
                        try {
                            newIterator2.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        newIterator2.close();
                    }
                }
                throw th5;
            }
        } finally {
            if (newIterator != null) {
                if (0 != 0) {
                    try {
                        newIterator.close();
                    } catch (Throwable th7) {
                        th.addSuppressed(th7);
                    }
                } else {
                    newIterator.close();
                }
            }
        }
    }

    @Override // crawlercommons.urlfrontier.service.AbstractFrontierService
    protected int sendURLsForQueue(QueueInterface queueInterface, QueueWithinCrawl queueWithinCrawl, int i, int i2, long j, StreamObserver<Urlfrontier.URLInfo> streamObserver) {
        int i3 = 0;
        byte[] bytes = (normalise(queueWithinCrawl) + "_").getBytes(StandardCharsets.UTF_8);
        RocksIterator newIterator = this.rocksDB.newIterator(this.columnFamilyHandleList.get(1));
        Throwable th = null;
        try {
            try {
                newIterator.seek(bytes);
                while (newIterator.isValid() && i3 < i) {
                    String str = new String(newIterator.key(), StandardCharsets.UTF_8);
                    int indexOf = str.indexOf(95);
                    int indexOf2 = str.indexOf(95, indexOf + 1);
                    int indexOf3 = str.indexOf(95, indexOf2 + 1);
                    String substring = str.substring(0, indexOf);
                    String substring2 = str.substring(indexOf + 1, indexOf2);
                    String substring3 = str.substring(indexOf3 + 1);
                    if (!queueWithinCrawl.equals(substring, substring2)) {
                        int i4 = i3;
                        if (newIterator != null) {
                            if (0 != 0) {
                                try {
                                    newIterator.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                newIterator.close();
                            }
                        }
                        return i4;
                    }
                    if (Long.parseLong(str.substring(indexOf2 + 1, indexOf3)) > j) {
                        int i5 = i3;
                        if (newIterator != null) {
                            if (0 != 0) {
                                try {
                                    newIterator.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                newIterator.close();
                            }
                        }
                        return i5;
                    }
                    if (!((QueueMetadata) queueInterface).isHeld(substring3, j)) {
                        try {
                            streamObserver.onNext(Urlfrontier.URLInfo.parseFrom(newIterator.value()));
                            ((QueueMetadata) queueInterface).holdUntil(substring3, j + i2);
                            i3++;
                        } catch (InvalidProtocolBufferException e) {
                            LOG.error("Caught unlikely error ", (Throwable) e);
                        }
                    }
                    newIterator.next();
                }
                if (newIterator != null) {
                    if (0 != 0) {
                        try {
                            newIterator.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        newIterator.close();
                    }
                }
                return i3;
            } finally {
            }
        } catch (Throwable th5) {
            if (newIterator != null) {
                if (th != null) {
                    try {
                        newIterator.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    newIterator.close();
                }
            }
            throw th5;
        }
    }

    @Override // crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public StreamObserver<Urlfrontier.URLItem> putURLs(final StreamObserver<Urlfrontier.String> streamObserver) {
        return new StreamObserver<Urlfrontier.URLItem>() { // from class: crawlercommons.urlfrontier.service.rocksdb.RocksDBService.1
            @Override // io.grpc.stub.StreamObserver
            public void onNext(Urlfrontier.URLItem uRLItem) {
                Urlfrontier.URLInfo info;
                long refetchableFromDate;
                byte[] bytes;
                boolean z = true;
                if (uRLItem.hasDiscovered()) {
                    info = uRLItem.getDiscovered().getInfo();
                    refetchableFromDate = Instant.now().getEpochSecond();
                } else {
                    Urlfrontier.KnownURLItem known = uRLItem.getKnown();
                    info = known.getInfo();
                    refetchableFromDate = known.getRefetchableFromDate();
                    z = Boolean.FALSE.booleanValue();
                }
                String key = info.getKey();
                String url = info.getUrl();
                String normaliseCrawlID = CrawlID.normaliseCrawlID(info.getCrawlID());
                if (key.equals("")) {
                    RocksDBService.LOG.debug("key missing for {}", url);
                    key = RocksDBService.this.provideMissingKey(url);
                    if (key == null) {
                        RocksDBService.LOG.error("Malformed URL {}", url);
                        streamObserver.onNext(Urlfrontier.String.newBuilder().setValue(url).build());
                        return;
                    }
                    info = Urlfrontier.URLInfo.newBuilder(info).setKey(key).setCrawlID(normaliseCrawlID).build();
                }
                if (key.length() > 255) {
                    RocksDBService.LOG.error("Key too long: {}", key);
                    streamObserver.onNext(Urlfrontier.String.newBuilder().setValue(url).build());
                    return;
                }
                QueueWithinCrawl queueWithinCrawl = QueueWithinCrawl.get(key, normaliseCrawlID);
                if (RocksDBService.this.queuesBeingDeleted.containsKey(queueWithinCrawl)) {
                    RocksDBService.LOG.info("Not adding {} as its queue {} is being deleted", url, key);
                    streamObserver.onNext(Urlfrontier.String.newBuilder().setValue(url).build());
                    return;
                }
                byte[] bytes2 = (RocksDBService.normalise(queueWithinCrawl) + "_" + url).getBytes(StandardCharsets.UTF_8);
                try {
                    byte[] bArr = RocksDBService.this.rocksDB.get(bytes2);
                    if (bArr != null && z) {
                        streamObserver.onNext(Urlfrontier.String.newBuilder().setValue(url).build());
                        return;
                    }
                    QueueMetadata queueMetadata = (QueueMetadata) RocksDBService.this.queues.computeIfAbsent(queueWithinCrawl, queueWithinCrawl2 -> {
                        return new QueueMetadata();
                    });
                    if (bArr != null) {
                        try {
                            RocksDBService.this.rocksDB.delete((ColumnFamilyHandle) RocksDBService.this.columnFamilyHandleList.get(1), bArr);
                            queueMetadata.removeFromProcessed(url);
                            queueMetadata.decrementActive();
                        } catch (RocksDBException e) {
                            RocksDBService.LOG.error("RocksDB exception", (Throwable) e);
                        }
                    }
                    if (z || refetchableFromDate != 0) {
                        bytes = (RocksDBService.normalise(queueWithinCrawl) + "_" + RocksDBService.DF.format(refetchableFromDate) + "_" + url).getBytes(StandardCharsets.UTF_8);
                        RocksDBService.this.rocksDB.put((ColumnFamilyHandle) RocksDBService.this.columnFamilyHandleList.get(1), bytes, info.toByteArray());
                        queueMetadata.incrementActive();
                    } else {
                        bytes = new byte[0];
                        queueMetadata.incrementCompleted();
                    }
                    RocksDBService.this.rocksDB.put((ColumnFamilyHandle) RocksDBService.this.columnFamilyHandleList.get(0), bytes2, bytes);
                    streamObserver.onNext(Urlfrontier.String.newBuilder().setValue(url).build());
                } catch (RocksDBException e2) {
                    RocksDBService.LOG.error("RocksDB exception", (Throwable) e2);
                }
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                RocksDBService.LOG.error("Throwable caught", th);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                streamObserver.onCompleted();
            }
        };
    }

    @Override // crawlercommons.urlfrontier.service.AbstractFrontierService, crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void deleteQueue(Urlfrontier.QueueWithinCrawlParams queueWithinCrawlParams, StreamObserver<Urlfrontier.Integer> streamObserver) {
        QueueWithinCrawl queueWithinCrawl = QueueWithinCrawl.get(queueWithinCrawlParams.getKey(), queueWithinCrawlParams.getCrawlID());
        if (this.queuesBeingDeleted.contains(queueWithinCrawl)) {
            streamObserver.onNext(Urlfrontier.Integer.newBuilder().setValue(0).build());
            streamObserver.onCompleted();
            return;
        }
        this.queuesBeingDeleted.put(queueWithinCrawl, queueWithinCrawl);
        QueueWithinCrawl[] queueWithinCrawlArr = (QueueWithinCrawl[]) this.queues.keySet().toArray(new QueueWithinCrawl[0]);
        Arrays.sort(queueWithinCrawlArr);
        byte[] bArr = null;
        byte[] bArr2 = null;
        for (QueueWithinCrawl queueWithinCrawl2 : queueWithinCrawlArr) {
            if (bArr != null) {
                bArr2 = (normalise(queueWithinCrawl2) + "_").getBytes(StandardCharsets.UTF_8);
                break;
            } else {
                if (queueWithinCrawl2.equals(queueWithinCrawl)) {
                    bArr = (normalise(queueWithinCrawl) + "_").getBytes(StandardCharsets.UTF_8);
                }
            }
        }
        try {
            deleteRanges(bArr, bArr2);
        } catch (RocksDBException e) {
            LOG.error("Exception caught when deleting ranges - {} - {}", new String(bArr), new String(bArr2));
        }
        QueueInterface remove = this.queues.remove(queueWithinCrawl);
        int countActive = 0 + remove.countActive() + remove.getCountCompleted();
        this.queuesBeingDeleted.remove(queueWithinCrawl);
        streamObserver.onNext(Urlfrontier.Integer.newBuilder().setValue(countActive).build());
        streamObserver.onCompleted();
    }

    public static final String normalise(QueueWithinCrawl queueWithinCrawl) {
        return queueWithinCrawl.getCrawlid().replaceAll("_", "%5F") + "_" + queueWithinCrawl.getQueue().replaceAll("_", "%5F");
    }

    public static final QueueWithinCrawl parseAndDeNormalise(String str) {
        int indexOf = str.indexOf(95);
        return QueueWithinCrawl.get(str.substring(indexOf + 1, str.indexOf(95, indexOf + 1)).replaceAll("%5F", "_"), str.substring(0, indexOf).replaceAll("%5F", "_"));
    }

    @Override // crawlercommons.urlfrontier.service.AbstractFrontierService, crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void getStats(Urlfrontier.QueueWithinCrawlParams queueWithinCrawlParams, StreamObserver<Urlfrontier.Stats> streamObserver) {
        if (this.statistics != null) {
            LOG.info("RockdSB stats: {}", this.statistics);
        }
        super.getStats(queueWithinCrawlParams, streamObserver);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Iterator<ColumnFamilyHandle> it = this.columnFamilyHandleList.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        if (this.statistics != null) {
            this.statistics.close();
        }
        if (this.rocksDB != null) {
            try {
                this.rocksDB.close();
            } catch (Exception e) {
                LOG.error("Closing ", (Throwable) e);
            }
        }
    }

    @Override // crawlercommons.urlfrontier.service.AbstractFrontierService, crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void deleteCrawl(Urlfrontier.String string, StreamObserver<Urlfrontier.Integer> streamObserver) {
        long j = 0;
        String normaliseCrawlID = CrawlID.normaliseCrawlID(string.getValue());
        HashSet<QueueWithinCrawl> hashSet = new HashSet();
        synchronized (this.queues) {
            QueueWithinCrawl[] queueWithinCrawlArr = (QueueWithinCrawl[]) this.queues.keySet().toArray(new QueueWithinCrawl[0]);
            Arrays.sort(queueWithinCrawlArr);
            byte[] bArr = null;
            byte[] bArr2 = null;
            for (QueueWithinCrawl queueWithinCrawl : queueWithinCrawlArr) {
                if (!queueWithinCrawl.getCrawlid().equals(normaliseCrawlID)) {
                    if (bArr != null) {
                        bArr2 = (queueWithinCrawl.getCrawlid().replaceAll("_", "%5F") + "_").getBytes(StandardCharsets.UTF_8);
                        break;
                    }
                } else {
                    if (bArr == null) {
                        bArr = (queueWithinCrawl.getCrawlid().replaceAll("_", "%5F") + "_").getBytes(StandardCharsets.UTF_8);
                    }
                    hashSet.add(queueWithinCrawl);
                }
            }
            try {
                deleteRanges(bArr, bArr2);
            } catch (RocksDBException e) {
                LOG.error("Exception caught when deleting ranges - {} - {}", new String(bArr), new String(bArr2));
            }
            for (QueueWithinCrawl queueWithinCrawl2 : hashSet) {
                if (!this.queuesBeingDeleted.contains(queueWithinCrawl2)) {
                    this.queuesBeingDeleted.put(queueWithinCrawl2, queueWithinCrawl2);
                    QueueInterface remove = this.queues.remove(queueWithinCrawl2);
                    j = j + remove.countActive() + remove.getCountCompleted();
                    this.queuesBeingDeleted.remove(queueWithinCrawl2);
                }
            }
        }
        streamObserver.onNext(Urlfrontier.Integer.newBuilder().setValue(j).build());
        streamObserver.onCompleted();
    }

    private void deleteRanges(byte[] bArr, byte[] bArr2) throws RocksDBException {
        boolean z = false;
        if (bArr2 == null) {
            RocksIterator newIterator = this.rocksDB.newIterator(this.columnFamilyHandleList.get(0));
            Throwable th = null;
            try {
                try {
                    newIterator.seekToLast();
                    if (newIterator.isValid()) {
                        bArr2 = newIterator.key();
                        z = true;
                    }
                    if (newIterator != null) {
                        if (0 != 0) {
                            try {
                                newIterator.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newIterator.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (newIterator != null) {
                    if (th != null) {
                        try {
                            newIterator.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        newIterator.close();
                    }
                }
                throw th4;
            }
        }
        if (bArr2 == null) {
            throw new RuntimeException("No endkey found");
        }
        this.rocksDB.deleteRange(this.columnFamilyHandleList.get(1), bArr, bArr2);
        this.rocksDB.deleteRange(this.columnFamilyHandleList.get(0), bArr, bArr2);
        if (z) {
            this.rocksDB.deleteRange(this.columnFamilyHandleList.get(1), bArr2, bArr2);
            this.rocksDB.delete(this.columnFamilyHandleList.get(0), bArr2);
        }
    }

    static {
        RocksDB.loadLibrary();
    }
}
