package crawlercommons.urlfrontier.service;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.core.CoreConstants;
import crawlercommons.urlfrontier.CrawlID;
import crawlercommons.urlfrontier.URLFrontierGrpc;
import crawlercommons.urlfrontier.Urlfrontier;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.prometheus.client.Counter;
import io.prometheus.client.Summary;
import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:crawlercommons/urlfrontier/service/AbstractFrontierService.class */
public abstract class AbstractFrontierService extends URLFrontierGrpc.URLFrontierImplBase implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractFrontierService.class);
    private static final Counter getURLs_calls = Counter.build().name("frontier_getURLs_calls_total").help("Number of times getURLs has been called.").register();
    private static final Counter getURLs_urls_count = Counter.build().name("frontier_getURLs_total").help("Number of URLs returned.").register();
    private static final Summary getURLs_Latency = Summary.build().name("frontier_getURLs_latency_seconds").help("getURLs latency in seconds.").register();
    protected static final Counter putURLs_calls = Counter.build().name("frontier_putURLs_calls_total").help("Number of times putURLs has been called.").register();
    protected static final Counter putURLs_urls_count = Counter.build().name("frontier_putURLs_total").help("Number of URLs sent to the Frontier").register();
    protected static final Counter putURLs_discovered_count = Counter.build().name("frontier_putURLs_discovered_total").help("Count of discovered URLs sent to the Frontier").labelNames("discovered").register();
    protected static final Counter putURLs_alreadyknown_count = Counter.build().name("frontier_putURLs_ignored_total").help("Number of discovered URLs already known to the Frontier").register();
    protected static final Counter putURLs_completed_count = Counter.build().name("frontier_putURLs_completed_total").help("Number of completed URLs").register();
    private boolean active;
    private int defaultDelayForQueues;
    protected String address;
    private List<String> nodes;
    private final Map<QueueWithinCrawl, QueueInterface> queues;
    protected final ExecutorService readExecutorService;
    protected final ExecutorService writeExecutorService;
    private boolean closing;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractFrontierService() {
        this(Collections.emptyMap());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractFrontierService(Map<String, String> map) {
        this.active = true;
        this.defaultDelayForQueues = 1;
        this.queues = Collections.synchronizedMap(new LinkedHashMap());
        this.closing = false;
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        LOG.info("Available processor(s) {}", Integer.valueOf(availableProcessors));
        String num = Integer.toString(Math.max(availableProcessors / 4, 1));
        int parseInt = Integer.parseInt(map.getOrDefault("read.thread.num", num));
        LOG.info("Using {} threads for reading from queues", Integer.valueOf(parseInt));
        this.readExecutorService = Executors.newFixedThreadPool(parseInt);
        int parseInt2 = Integer.parseInt(map.getOrDefault("write.thread.num", num));
        this.writeExecutorService = Executors.newFixedThreadPool(parseInt2);
        LOG.info("Using {} threads for writing to queues", Integer.valueOf(parseInt2));
    }

    public Map<QueueWithinCrawl, QueueInterface> getQueues() {
        return this.queues;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isClosing() {
        return this.closing;
    }

    public int getDefaultDelayForQueues() {
        return this.defaultDelayForQueues;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<String> getNodes() {
        return this.nodes;
    }

    public void setDefaultDelayForQueues(int i) {
        this.defaultDelayForQueues = i;
    }

    protected boolean isActive() {
        return this.active;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setHostAndPort(String str, int i) {
        this.address = str + ":" + i;
    }

    public String getHostAndPort() {
        return this.address;
    }

    public void setNodes(List<String> list) {
        this.nodes = list;
        Collections.sort(this.nodes);
        LOG.debug(this.address);
        int i = 0;
        Iterator<String> it = this.nodes.iterator();
        while (it.hasNext()) {
            LOG.info("Node {}: {}", Integer.valueOf(i), it.next());
            i++;
        }
    }

    @Override // crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void listCrawls(Urlfrontier.Local local, StreamObserver<Urlfrontier.StringList> streamObserver) {
        HashSet hashSet = new HashSet();
        synchronized (getQueues()) {
            Iterator<Map.Entry<QueueWithinCrawl, QueueInterface>> it = getQueues().entrySet().iterator();
            while (it.hasNext()) {
                hashSet.add(it.next().getKey().getCrawlid());
            }
        }
        streamObserver.onNext(Urlfrontier.StringList.newBuilder().addAllValues(hashSet).build());
        streamObserver.onCompleted();
    }

    @Override // crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void deleteCrawl(Urlfrontier.DeleteCrawlMessage deleteCrawlMessage, StreamObserver<Urlfrontier.Long> streamObserver) {
        long j = -1;
        if (!isClosing()) {
            j = 0;
            String normaliseCrawlID = CrawlID.normaliseCrawlID(deleteCrawlMessage.getValue());
            HashSet hashSet = new HashSet();
            synchronized (getQueues()) {
                Iterator<Map.Entry<QueueWithinCrawl, QueueInterface>> it = getQueues().entrySet().iterator();
                while (it.hasNext()) {
                    QueueWithinCrawl key = it.next().getKey();
                    if (key.getCrawlid().equals(normaliseCrawlID)) {
                        hashSet.add(key);
                    }
                }
                while (hashSet.iterator().hasNext()) {
                    j += getQueues().remove((QueueWithinCrawl) r0.next()).countActive();
                }
            }
        }
        streamObserver.onNext(Urlfrontier.Long.newBuilder().setValue(j).build());
        streamObserver.onCompleted();
    }

    @Override // crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void setActive(Urlfrontier.Active active, StreamObserver<Urlfrontier.Empty> streamObserver) {
        this.active = active.getState();
        streamObserver.onNext(Urlfrontier.Empty.getDefaultInstance());
        streamObserver.onCompleted();
    }

    @Override // crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void getActive(Urlfrontier.Local local, StreamObserver<Urlfrontier.Boolean> streamObserver) {
        streamObserver.onNext(Urlfrontier.Boolean.newBuilder().setState(this.active).build());
        streamObserver.onCompleted();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String provideMissingKey(String str) {
        String str2 = str;
        int indexOf = str2.indexOf("://");
        if (indexOf != -1) {
            str2 = str.substring(indexOf + 3);
        }
        int indexOf2 = str2.indexOf(":");
        if (indexOf2 != -1) {
            str2 = str2.substring(0, indexOf2);
        }
        int indexOf3 = str2.indexOf(DefragmentationParameters.SEPARATOR);
        if (indexOf3 != -1) {
            str2 = str2.substring(0, indexOf3);
        }
        int indexOf4 = str2.indexOf(CoreConstants.NA);
        if (indexOf4 != -1) {
            str2 = str2.substring(0, indexOf4);
        }
        int indexOf5 = str2.indexOf("&");
        if (indexOf5 != -1) {
            str2 = str2.substring(0, indexOf5);
        }
        if (str2.length() == 0) {
            return null;
        }
        return str2;
    }

    @Override // crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void listQueues(Urlfrontier.Pagination pagination, StreamObserver<Urlfrontier.QueueList> streamObserver) {
        long size = pagination.getSize();
        long start = pagination.getStart();
        boolean includeInactive = pagination.getIncludeInactive();
        String normaliseCrawlID = CrawlID.normaliseCrawlID(pagination.getCrawlID());
        if (size == 0) {
            size = 100;
        }
        LOG.info("Received request to list queues [size {}; start {}; inactive {}]", Long.valueOf(size), Long.valueOf(start), Boolean.valueOf(includeInactive));
        long epochSecond = Instant.now().getEpochSecond();
        int i = -1;
        int i2 = 0;
        Urlfrontier.QueueList.Builder newBuilder = Urlfrontier.QueueList.newBuilder();
        synchronized (getQueues()) {
            Iterator<Map.Entry<QueueWithinCrawl, QueueInterface>> it = getQueues().entrySet().iterator();
            while (it.hasNext() && i2 <= size) {
                Map.Entry<QueueWithinCrawl, QueueInterface> next = it.next();
                i++;
                if (next.getKey().getCrawlid().equals(normaliseCrawlID) && (includeInactive || next.getValue().getBlockedUntil() < epochSecond)) {
                    if ((includeInactive || next.getValue().countActive() > 0) && i >= start) {
                        newBuilder.addValues(next.getKey().getQueue());
                        i2++;
                    }
                }
            }
        }
        streamObserver.onNext(newBuilder.build());
        streamObserver.onCompleted();
    }

    @Override // crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void blockQueueUntil(Urlfrontier.BlockQueueParams blockQueueParams, StreamObserver<Urlfrontier.Empty> streamObserver) {
        if (!isClosing()) {
            QueueInterface queueInterface = getQueues().get(QueueWithinCrawl.get(blockQueueParams.getKey(), blockQueueParams.getCrawlID()));
            if (queueInterface != null) {
                queueInterface.setBlockedUntil(blockQueueParams.getTime());
            }
        }
        streamObserver.onNext(Urlfrontier.Empty.getDefaultInstance());
        streamObserver.onCompleted();
    }

    @Override // crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void setDelay(Urlfrontier.QueueDelayParams queueDelayParams, StreamObserver<Urlfrontier.Empty> streamObserver) {
        if (!isClosing()) {
            if (queueDelayParams.getKey().isEmpty()) {
                setDefaultDelayForQueues(queueDelayParams.getDelayRequestable());
            } else {
                QueueInterface queueInterface = getQueues().get(QueueWithinCrawl.get(queueDelayParams.getKey(), queueDelayParams.getCrawlID()));
                if (queueInterface != null) {
                    queueInterface.setDelay(queueDelayParams.getDelayRequestable());
                }
            }
        }
        streamObserver.onNext(Urlfrontier.Empty.getDefaultInstance());
        streamObserver.onCompleted();
    }

    @Override // crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void deleteQueue(Urlfrontier.QueueWithinCrawlParams queueWithinCrawlParams, StreamObserver<Urlfrontier.Long> streamObserver) {
        long j = -1;
        if (!isClosing()) {
            j = getQueues().remove(QueueWithinCrawl.get(queueWithinCrawlParams.getKey(), queueWithinCrawlParams.getCrawlID())).countActive();
        }
        streamObserver.onNext(Urlfrontier.Long.newBuilder().setValue(j).build());
        streamObserver.onCompleted();
    }

    @Override // crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void getStats(Urlfrontier.QueueWithinCrawlParams queueWithinCrawlParams, StreamObserver<Urlfrontier.Stats> streamObserver) {
        LOG.info("Received stats request");
        HashMap hashMap = new HashMap();
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        long j = 0;
        long j2 = 0;
        LinkedList<QueueInterface> linkedList = new LinkedList();
        String normaliseCrawlID = CrawlID.normaliseCrawlID(queueWithinCrawlParams.getCrawlID());
        if (queueWithinCrawlParams.getKey().isEmpty()) {
            synchronized (getQueues()) {
                for (Map.Entry<QueueWithinCrawl, QueueInterface> entry : getQueues().entrySet()) {
                    if (entry.getKey().getCrawlid().equals(normaliseCrawlID)) {
                        linkedList.add(entry.getValue());
                    }
                }
            }
        } else {
            QueueInterface queueInterface = getQueues().get(QueueWithinCrawl.get(queueWithinCrawlParams.getKey(), queueWithinCrawlParams.getCrawlID()));
            if (queueInterface != null) {
                linkedList.add(queueInterface);
            }
        }
        long epochSecond = Instant.now().getEpochSecond();
        synchronized (getQueues()) {
            for (QueueInterface queueInterface2 : linkedList) {
                int inProcess = queueInterface2.getInProcess(epochSecond);
                int countActive = queueInterface2.countActive();
                if (inProcess > 0 || countActive > 0) {
                    j2++;
                }
                i += inProcess;
                i2++;
                i3 += countActive;
                j += queueInterface2.getCountCompleted();
            }
        }
        hashMap.put("completed", Long.valueOf(j));
        hashMap.put("active_queues", Long.valueOf(j2));
        streamObserver.onNext(Urlfrontier.Stats.newBuilder().setNumberOfQueues(i2).setSize(i3).setInProcess(i).putAllCounts(hashMap).setCrawlID(normaliseCrawlID).build());
        streamObserver.onCompleted();
    }

    /* JADX WARN: Code restructure failed: missing block: B:62:0x0278, code lost:
    
        if (r24 == null) goto L106;
     */
    /* JADX WARN: Code restructure failed: missing block: B:64:0x0285, code lost:
    
        if (r0.getCrawlid().equals(r24) != false) goto L104;
     */
    /* JADX WARN: Code restructure failed: missing block: B:67:0x0295, code lost:
    
        if (r0.getBlockedUntil() < r0) goto L108;
     */
    /* JADX WARN: Code restructure failed: missing block: B:69:0x029b, code lost:
    
        r36 = r0.getDelay();
     */
    /* JADX WARN: Code restructure failed: missing block: B:70:0x02a7, code lost:
    
        if (r36 != (-1)) goto L86;
     */
    /* JADX WARN: Code restructure failed: missing block: B:71:0x02aa, code lost:
    
        r36 = getDefaultDelayForQueues();
     */
    /* JADX WARN: Code restructure failed: missing block: B:73:0x02be, code lost:
    
        if ((r0.getLastProduced() + r36) < r0) goto L109;
     */
    /* JADX WARN: Code restructure failed: missing block: B:76:0x02cf, code lost:
    
        if (r0.getInProcess(r0) < r20) goto L110;
     */
    /* JADX WARN: Code restructure failed: missing block: B:78:0x02d5, code lost:
    
        r0.incrementAndGet();
        r4 = r20;
        r5 = r21;
        r14.readExecutorService.execute(() -> { // java.lang.Runnable.run():void
            r1.lambda$getURLs$0(r2, r3, r4, r5, r6, r7, r8, r9, r10, r11);
        });
     */
    @Override // crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void getURLs(crawlercommons.urlfrontier.Urlfrontier.GetParams r15, io.grpc.stub.StreamObserver<crawlercommons.urlfrontier.Urlfrontier.URLInfo> r16) {
        /*
            Method dump skipped, instructions count: 869
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: crawlercommons.urlfrontier.service.AbstractFrontierService.getURLs(crawlercommons.urlfrontier.Urlfrontier$GetParams, io.grpc.stub.StreamObserver):void");
    }

    protected abstract int sendURLsForQueue(QueueInterface queueInterface, QueueWithinCrawl queueWithinCrawl, int i, int i2, long j, SynchronizedStreamObserver<Urlfrontier.URLInfo> synchronizedStreamObserver);

    @Override // crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void setLogLevel(Urlfrontier.LogLevelParams logLevelParams, StreamObserver<Urlfrontier.Empty> streamObserver) {
        ((LoggerContext) LoggerFactory.getILoggerFactory()).getLogger(logLevelParams.getPackage()).setLevel(Level.toLevel(logLevelParams.getLevel().toString()));
        LOG.info("Log level for {} set to {}", logLevelParams.getPackage(), logLevelParams.getLevel().toString());
        streamObserver.onNext(Urlfrontier.Empty.getDefaultInstance());
        streamObserver.onCompleted();
    }

    @Override // crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void listNodes(Urlfrontier.Empty empty, StreamObserver<Urlfrontier.StringList> streamObserver) {
        if (this.nodes == null) {
            this.nodes = new ArrayList();
        }
        if (this.nodes.isEmpty()) {
            this.nodes.add(getHostAndPort());
        }
        streamObserver.onNext(Urlfrontier.StringList.newBuilder().addAllValues(this.nodes).build());
        streamObserver.onCompleted();
    }

    @Override // crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public StreamObserver<Urlfrontier.URLItem> putURLs(StreamObserver<Urlfrontier.AckMessage> streamObserver) {
        putURLs_calls.inc();
        final StreamObserver wrapping = SynchronizedStreamObserver.wrapping(streamObserver, -1);
        return new StreamObserver<Urlfrontier.URLItem>() { // from class: crawlercommons.urlfrontier.service.AbstractFrontierService.1
            final AtomicInteger unacked = new AtomicInteger();

            @Override // io.grpc.stub.StreamObserver
            public void onNext(Urlfrontier.URLItem uRLItem) {
                String url = uRLItem.hasDiscovered() ? uRLItem.getDiscovered().getInfo().getUrl() : uRLItem.getKnown().getInfo().getUrl();
                Urlfrontier.AckMessage.Builder newBuilder = Urlfrontier.AckMessage.newBuilder();
                if (uRLItem.getID() == null || uRLItem.getID().isEmpty()) {
                    newBuilder.setID(url);
                } else {
                    newBuilder.setID(uRLItem.getID());
                }
                if (AbstractFrontierService.this.isClosing()) {
                    wrapping.onNext(newBuilder.setStatus(Urlfrontier.AckMessage.Status.FAIL).build());
                    return;
                }
                this.unacked.incrementAndGet();
                ExecutorService executorService = AbstractFrontierService.this.writeExecutorService;
                String str = url;
                StreamObserver streamObserver2 = wrapping;
                executorService.execute(() -> {
                    Urlfrontier.AckMessage.Status putURLItem = AbstractFrontierService.this.putURLItem(uRLItem);
                    AbstractFrontierService.LOG.debug("putURL -> {} got status {}", str, putURLItem);
                    streamObserver2.onNext(newBuilder.setStatus(putURLItem).build());
                    this.unacked.decrementAndGet();
                });
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                if ((th instanceof StatusRuntimeException) && ((StatusRuntimeException) th).getStatus().getCode().equals(Status.Code.CANCELLED)) {
                    return;
                }
                AbstractFrontierService.LOG.error("Error reported {}", th.getMessage());
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                while (this.unacked.get() != 0) {
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                wrapping.onCompleted();
            }
        };
    }

    protected abstract Urlfrontier.AckMessage.Status putURLItem(Urlfrontier.URLItem uRLItem);

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closing = true;
        this.writeExecutorService.shutdown();
        this.readExecutorService.shutdown();
        try {
            if (!this.writeExecutorService.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
                this.writeExecutorService.shutdownNow();
            }
            if (!this.readExecutorService.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
                this.readExecutorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            this.writeExecutorService.shutdownNow();
            this.readExecutorService.shutdownNow();
        }
    }
}
