package crawlercommons.urlfrontier.service.cluster;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import crawlercommons.urlfrontier.CrawlID;
import crawlercommons.urlfrontier.URLFrontierGrpc;
import crawlercommons.urlfrontier.Urlfrontier;
import crawlercommons.urlfrontier.service.AbstractFrontierService;
import crawlercommons.urlfrontier.service.QueueInterface;
import crawlercommons.urlfrontier.service.QueueWithinCrawl;
import crawlercommons.urlfrontier.service.SynchronizedStreamObserver;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:crawlercommons/urlfrontier/service/cluster/DistributedFrontierService.class */
public abstract class DistributedFrontierService extends AbstractFrontierService {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DistributedFrontierService.class);
    protected boolean clusterMode = false;
    private final CacheLoader<String, ManagedChannel> channelLoader = new CacheLoader<String, ManagedChannel>() { // from class: crawlercommons.urlfrontier.service.cluster.DistributedFrontierService.1
        /* JADX WARN: Type inference failed for: r0v2, types: [io.grpc.ManagedChannelBuilder] */
        @Override // com.google.common.cache.CacheLoader
        public ManagedChannel load(String str) {
            return ManagedChannelBuilder.forTarget(str).usePlaintext().build();
        }
    };
    private final RemovalListener<String, ManagedChannel> channelRemovalListener = new RemovalListener<String, ManagedChannel>() { // from class: crawlercommons.urlfrontier.service.cluster.DistributedFrontierService.2
        @Override // com.google.common.cache.RemovalListener
        public void onRemoval(RemovalNotification<String, ManagedChannel> removalNotification) {
            removalNotification.getValue().shutdownNow();
        }
    };
    private LoadingCache<String, ManagedChannel> channelCache = CacheBuilder.newBuilder().removalListener(this.channelRemovalListener).build(this.channelLoader);
    private final CacheLoader<Integer, StreamObserver<Urlfrontier.URLItem>> observerloader = new CacheLoader<Integer, StreamObserver<Urlfrontier.URLItem>>() { // from class: crawlercommons.urlfrontier.service.cluster.DistributedFrontierService.3
        @Override // com.google.common.cache.CacheLoader
        public StreamObserver<Urlfrontier.URLItem> load(final Integer num) {
            return URLFrontierGrpc.newStub(DistributedFrontierService.this.channelCache.getUnchecked((String) DistributedFrontierService.this.getNodes().get(num.intValue()))).putURLs(new StreamObserver<Urlfrontier.AckMessage>() { // from class: crawlercommons.urlfrontier.service.cluster.DistributedFrontierService.3.1
                @Override // io.grpc.stub.StreamObserver
                public void onNext(Urlfrontier.AckMessage ackMessage) {
                    StreamObserver<Urlfrontier.AckMessage> ifPresent = DistributedFrontierService.this.inprocesscache.getIfPresent(ackMessage.getID());
                    if (ifPresent != null) {
                        DistributedFrontierService.LOG.debug("Got stream to ack back for {} with status {}", ackMessage.getID(), ackMessage.getStatus());
                        try {
                            ifPresent.onNext(ackMessage);
                        } catch (Exception e) {
                            DistributedFrontierService.LOG.error("Error while communicating back with the client: {} ", e.getLocalizedMessage());
                        }
                    } else {
                        DistributedFrontierService.LOG.error("No stream found to ack back for {} with status {}", ackMessage.getID(), ackMessage.getStatus());
                    }
                    DistributedFrontierService.this.inprocesscache.invalidate(ackMessage.getID());
                }

                @Override // io.grpc.stub.StreamObserver
                public void onError(Throwable th) {
                    DistributedFrontierService.this.observercache.invalidate(num);
                    if ((th instanceof StatusRuntimeException) && ((StatusRuntimeException) th).getStatus().getCode().equals(Status.Code.CANCELLED)) {
                        return;
                    }
                    DistributedFrontierService.LOG.error("Caught throwable when forwarding request to shard {}: {}", num, th.getLocalizedMessage());
                }

                @Override // io.grpc.stub.StreamObserver
                public void onCompleted() {
                    DistributedFrontierService.this.observercache.invalidate(num);
                }
            });
        }
    };
    private final RemovalListener<String, StreamObserver<Urlfrontier.URLItem>> observerlistener = new RemovalListener<String, StreamObserver<Urlfrontier.URLItem>>() { // from class: crawlercommons.urlfrontier.service.cluster.DistributedFrontierService.4
        @Override // com.google.common.cache.RemovalListener
        public void onRemoval(RemovalNotification<String, StreamObserver<Urlfrontier.URLItem>> removalNotification) {
            DistributedFrontierService.LOG.info("Removed StreamObserver {} with key {}", removalNotification.getValue(), removalNotification.getKey());
        }
    };
    private LoadingCache<Integer, StreamObserver<Urlfrontier.URLItem>> observercache = CacheBuilder.newBuilder().removalListener(this.observerlistener).expireAfterAccess(1, TimeUnit.MINUTES).build(this.observerloader);
    private final RemovalListener<String, StreamObserver<Urlfrontier.AckMessage>> inProcessRemovalListener = new RemovalListener<String, StreamObserver<Urlfrontier.AckMessage>>() { // from class: crawlercommons.urlfrontier.service.cluster.DistributedFrontierService.5
        @Override // com.google.common.cache.RemovalListener
        public void onRemoval(RemovalNotification<String, StreamObserver<Urlfrontier.AckMessage>> removalNotification) {
            if (removalNotification.wasEvicted()) {
                String key = removalNotification.getKey();
                DistributedFrontierService.LOG.debug("Trying to notify original stream about eviction of {}", key);
                StreamObserver<Urlfrontier.AckMessage> value = removalNotification.getValue();
                if (value != null) {
                    try {
                        value.onNext(Urlfrontier.AckMessage.newBuilder().setID(key).setStatus(Urlfrontier.AckMessage.Status.FAIL).build());
                    } catch (Exception e) {
                        DistributedFrontierService.LOG.error("Error while communicating back with the client: {} ", e.getLocalizedMessage());
                    }
                }
            }
        }
    };
    private Cache<String, StreamObserver<Urlfrontier.AckMessage>> inprocesscache = CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.MINUTES).removalListener(this.inProcessRemovalListener).build();

    private URLFrontierGrpc.URLFrontierBlockingStub getFrontier(String str) {
        return URLFrontierGrpc.newBlockingStub(this.channelCache.getUnchecked(str));
    }

    @Override // crawlercommons.urlfrontier.service.AbstractFrontierService, crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void deleteQueue(Urlfrontier.QueueWithinCrawlParams queueWithinCrawlParams, StreamObserver<Urlfrontier.Long> streamObserver) {
        QueueWithinCrawl queueWithinCrawl = QueueWithinCrawl.get(queueWithinCrawlParams.getKey(), queueWithinCrawlParams.getCrawlID());
        int i = 0;
        if (!queueWithinCrawlParams.getLocal() && this.clusterMode) {
            for (String str : getNodes()) {
                if (!str.equals(this.address)) {
                    i = (int) (i + getFrontier(str).deleteQueue(Urlfrontier.QueueWithinCrawlParams.newBuilder(queueWithinCrawlParams).setLocal(true).build()).getValue());
                }
            }
        }
        streamObserver.onNext(Urlfrontier.Long.newBuilder().setValue(i + deleteLocalQueue(queueWithinCrawl)).build());
        streamObserver.onCompleted();
    }

    protected abstract int deleteLocalQueue(QueueWithinCrawl queueWithinCrawl);

    @Override // crawlercommons.urlfrontier.service.AbstractFrontierService, crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void deleteCrawl(Urlfrontier.DeleteCrawlMessage deleteCrawlMessage, StreamObserver<Urlfrontier.Long> streamObserver) {
        if (!this.clusterMode) {
            super.deleteCrawl(deleteCrawlMessage, streamObserver);
            return;
        }
        long j = 0;
        String normaliseCrawlID = CrawlID.normaliseCrawlID(deleteCrawlMessage.getValue());
        if (!deleteCrawlMessage.getLocal()) {
            Urlfrontier.DeleteCrawlMessage build = Urlfrontier.DeleteCrawlMessage.newBuilder().setLocal(true).setValue(deleteCrawlMessage.getValue()).build();
            for (String str : getNodes()) {
                if (!str.equals(this.address)) {
                    j += getFrontier(str).deleteCrawl(build).getValue();
                }
            }
        }
        streamObserver.onNext(Urlfrontier.Long.newBuilder().setValue(j + deleteLocalCrawl(normaliseCrawlID)).build());
        streamObserver.onCompleted();
    }

    protected abstract long deleteLocalCrawl(String str);

    @Override // crawlercommons.urlfrontier.service.AbstractFrontierService, crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void getStats(Urlfrontier.QueueWithinCrawlParams queueWithinCrawlParams, StreamObserver<Urlfrontier.Stats> streamObserver) {
        LOG.info("Received stats request");
        if (queueWithinCrawlParams.getLocal() || !this.clusterMode) {
            super.getStats(queueWithinCrawlParams, streamObserver);
            return;
        }
        String normaliseCrawlID = CrawlID.normaliseCrawlID(queueWithinCrawlParams.getCrawlID());
        long j = 0;
        long j2 = 0;
        int i = 0;
        HashMap hashMap = new HashMap();
        Urlfrontier.QueueWithinCrawlParams build = Urlfrontier.QueueWithinCrawlParams.newBuilder(queueWithinCrawlParams).setLocal(true).build();
        Iterator<String> it = getNodes().iterator();
        while (it.hasNext()) {
            Urlfrontier.Stats stats = getFrontier(it.next()).getStats(build);
            j += stats.getNumberOfQueues();
            j2 += stats.getSize();
            i += stats.getInProcess();
            for (Map.Entry<String, Long> entry : stats.getCountsMap().entrySet()) {
                hashMap.compute(entry.getKey(), (str, l) -> {
                    return Long.valueOf(l != null ? l.longValue() + ((Long) entry.getValue()).longValue() : ((Long) entry.getValue()).longValue());
                });
            }
        }
        streamObserver.onNext(Urlfrontier.Stats.newBuilder().setNumberOfQueues(j).setSize(j2).setInProcess(i).putAllCounts(hashMap).setCrawlID(normaliseCrawlID).build());
        streamObserver.onCompleted();
    }

    @Override // crawlercommons.urlfrontier.service.AbstractFrontierService, crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void setLogLevel(Urlfrontier.LogLevelParams logLevelParams, StreamObserver<Urlfrontier.Empty> streamObserver) {
        if (!logLevelParams.getLocal() && this.clusterMode) {
            Urlfrontier.LogLevelParams build = Urlfrontier.LogLevelParams.newBuilder(logLevelParams).setLocal(true).build();
            for (String str : getNodes()) {
                if (!str.equals(this.address)) {
                    getFrontier(str).setLogLevel(build);
                }
            }
        }
        super.setLogLevel(logLevelParams, streamObserver);
    }

    @Override // crawlercommons.urlfrontier.service.AbstractFrontierService, crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void listCrawls(Urlfrontier.Local local, StreamObserver<Urlfrontier.StringList> streamObserver) {
        HashSet hashSet = new HashSet();
        if (!local.getLocal() && this.clusterMode) {
            Urlfrontier.Local build = Urlfrontier.Local.newBuilder().setLocal(true).build();
            for (String str : getNodes()) {
                if (!str.equals(this.address)) {
                    Iterator<String> it = getFrontier(str).listCrawls(build).getValuesList().iterator();
                    while (it.hasNext()) {
                        hashSet.add(it.next());
                    }
                }
            }
        }
        synchronized (getQueues()) {
            Iterator<Map.Entry<QueueWithinCrawl, QueueInterface>> it2 = getQueues().entrySet().iterator();
            while (it2.hasNext()) {
                hashSet.add(it2.next().getKey().getCrawlid());
            }
        }
        streamObserver.onNext(Urlfrontier.StringList.newBuilder().addAllValues(hashSet).build());
        streamObserver.onCompleted();
    }

    @Override // crawlercommons.urlfrontier.service.AbstractFrontierService, crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public void listQueues(Urlfrontier.Pagination pagination, StreamObserver<Urlfrontier.QueueList> streamObserver) {
        if (pagination.getLocal() || !this.clusterMode) {
            super.listQueues(pagination, streamObserver);
            return;
        }
        HashSet hashSet = new HashSet();
        Urlfrontier.Pagination build = Urlfrontier.Pagination.newBuilder(pagination).setLocal(true).build();
        Iterator<String> it = getNodes().iterator();
        while (it.hasNext()) {
            Iterator<String> it2 = getFrontier(it.next()).listQueues(build).getValuesList().iterator();
            while (it2.hasNext()) {
                hashSet.add(it2.next());
            }
        }
        Urlfrontier.QueueList.Builder newBuilder = Urlfrontier.QueueList.newBuilder();
        newBuilder.addAllValues(hashSet);
        streamObserver.onNext(newBuilder.build());
        streamObserver.onCompleted();
    }

    @Override // crawlercommons.urlfrontier.service.AbstractFrontierService, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        super.close();
        this.channelCache.invalidateAll();
    }

    @Override // crawlercommons.urlfrontier.service.AbstractFrontierService, crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierImplBase
    public StreamObserver<Urlfrontier.URLItem> putURLs(StreamObserver<Urlfrontier.AckMessage> streamObserver) {
        putURLs_calls.inc();
        ((ServerCallStreamObserver) streamObserver).setOnCancelHandler(() -> {
            LOG.error("Client cancelled");
        });
        final StreamObserver wrapping = SynchronizedStreamObserver.wrapping(streamObserver, -1);
        return new StreamObserver<Urlfrontier.URLItem>() { // from class: crawlercommons.urlfrontier.service.cluster.DistributedFrontierService.6
            final AtomicInteger unacked = new AtomicInteger();

            @Override // io.grpc.stub.StreamObserver
            public void onNext(Urlfrontier.URLItem uRLItem) {
                Urlfrontier.URLInfo info = uRLItem.hasDiscovered() ? uRLItem.getDiscovered().getInfo() : uRLItem.getKnown().getInfo();
                String key = info.getKey();
                String url = info.getUrl();
                String normaliseCrawlID = CrawlID.normaliseCrawlID(info.getCrawlID());
                Urlfrontier.AckMessage.Builder newBuilder = Urlfrontier.AckMessage.newBuilder();
                if (uRLItem.getID() == null || uRLItem.getID().isEmpty()) {
                    newBuilder.setID(url);
                } else {
                    newBuilder.setID(uRLItem.getID());
                }
                if (key.equals("")) {
                    DistributedFrontierService.LOG.debug("key missing for {}", url);
                    key = DistributedFrontierService.this.provideMissingKey(url);
                    if (key == null) {
                        DistributedFrontierService.LOG.error("Malformed URL {}", url);
                        wrapping.onNext(newBuilder.setStatus(Urlfrontier.AckMessage.Status.SKIPPED).build());
                        return;
                    }
                    Urlfrontier.URLInfo.newBuilder(info).setKey(key).setCrawlID(normaliseCrawlID).build();
                }
                DistributedFrontierService.LOG.debug("Received {} with queue {} and crawlid {}", url, key, normaliseCrawlID);
                int abs = Math.abs(QueueWithinCrawl.get(key, normaliseCrawlID).toString().hashCode() % DistributedFrontierService.this.getNodes().size());
                int indexOf = DistributedFrontierService.this.getNodes().indexOf(DistributedFrontierService.this.address);
                if (indexOf == -1) {
                    throw new RuntimeException("ShardedRocksDBService found conf 'nodes' but current node's address not set");
                }
                DistributedFrontierService.LOG.trace("LocalNodeIndex {}", Integer.valueOf(indexOf));
                if (abs == indexOf) {
                    ExecutorService executorService = DistributedFrontierService.this.writeExecutorService;
                    StreamObserver streamObserver2 = wrapping;
                    executorService.execute(() -> {
                        this.unacked.incrementAndGet();
                        Urlfrontier.AckMessage.Status putURLItem = DistributedFrontierService.this.putURLItem(uRLItem);
                        DistributedFrontierService.LOG.debug("Local putURL -> {} got status {}", url, putURLItem);
                        streamObserver2.onNext(newBuilder.setStatus(putURLItem).build());
                        this.unacked.decrementAndGet();
                    });
                } else {
                    DistributedFrontierService.LOG.debug("Sending {} to partition {} -> {}", url, Integer.valueOf(abs), DistributedFrontierService.this.getNodes().get(abs));
                    while (DistributedFrontierService.this.inprocesscache.getIfPresent(newBuilder.getID()) != null) {
                        try {
                            Thread.sleep(50L);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    DistributedFrontierService.this.inprocesscache.put(newBuilder.getID(), wrapping);
                    DistributedFrontierService.this.observercache.getUnchecked(Integer.valueOf(abs)).onNext(uRLItem);
                }
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                if ((th instanceof StatusRuntimeException) && ((StatusRuntimeException) th).getStatus().getCode().equals(Status.Code.CANCELLED)) {
                    return;
                }
                DistributedFrontierService.LOG.error("Throwable caught", th.getLocalizedMessage());
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                while (true) {
                    if (this.unacked.get() == 0 && !DistributedFrontierService.this.inprocesscache.asMap().containsValue(wrapping)) {
                        wrapping.onCompleted();
                        return;
                    } else {
                        try {
                            Thread.sleep(10L);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            }
        };
    }

    @Override // crawlercommons.urlfrontier.service.AbstractFrontierService
    protected abstract Urlfrontier.AckMessage.Status putURLItem(Urlfrontier.URLItem uRLItem);
}
