package io.datarouter.client.gcp.pubsub.job;

import io.datarouter.client.gcp.pubsub.GcpPubsubClientType;
import io.datarouter.client.gcp.pubsub.GcpPubsubExecutors;
import io.datarouter.client.gcp.pubsub.TopicAndSubscriptionName;
import io.datarouter.client.gcp.pubsub.client.GcpPubsubClientManager;
import io.datarouter.client.gcp.pubsub.config.DatarouterGcpPubsubPlugin;
import io.datarouter.client.gcp.pubsub.node.GcpPubsubPhysicalNode;
import io.datarouter.instrumentation.task.TaskTracker;
import io.datarouter.job.BaseJob;
import io.datarouter.plugin.PluginInjector;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import io.datarouter.storage.client.ClientId;
import io.datarouter.storage.client.ClientInitializationTracker;
import io.datarouter.storage.client.DatarouterClients;
import io.datarouter.storage.config.properties.ServiceName;
import io.datarouter.storage.node.DatarouterNodes;
import io.datarouter.storage.node.NodeTool;
import io.datarouter.storage.util.DatarouterQueueMetrics;
import io.datarouter.util.concurrent.ThreadTool;
import io.datarouter.util.number.RandomTool;
import io.datarouter.util.timer.PhaseTimer;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/client/gcp/pubsub/job/GcpPubsubQueueMetricMonitoringJob.class */
public class GcpPubsubQueueMetricMonitoringJob extends BaseJob {
    private static final Logger logger = LoggerFactory.getLogger(GcpPubsubQueueMetricMonitoringJob.class);

    @Inject
    private DatarouterClients datarouterClients;

    @Inject
    private DatarouterNodes datarouterNodes;

    @Inject
    private ClientInitializationTracker clientInitializationTracker;

    @Inject
    private DatarouterQueueMetrics metrics;

    @Inject
    private GcpPubsubClientManager clientManager;

    @Inject
    private GcpPubsubExecutors.GcpPubsubQueueLengthMonitoringJobExecutor executor;

    @Inject
    private ServiceName serviceName;

    @Inject
    private PluginInjector pluginInjector;

    public void run(TaskTracker taskTracker) {
        try {
            ThreadTool.sleep(RandomTool.nextPositiveInt(6) * 1000);
            Scanner map = Scanner.of(this.clientInitializationTracker.getInitializedClients()).include(clientId -> {
                return this.datarouterClients.getClientTypeInstance(clientId) instanceof GcpPubsubClientType;
            }).map((v0) -> {
                return v0.getName();
            });
            DatarouterNodes datarouterNodes = this.datarouterNodes;
            datarouterNodes.getClass();
            map.concatIter(datarouterNodes::getPhysicalNodesForClient).map((v0) -> {
                return NodeTool.extractSinglePhysicalNode(v0);
            }).map(physicalNode -> {
                return (GcpPubsubPhysicalNode) physicalNode;
            }).advanceUntil(gcpPubsubPhysicalNode -> {
                return taskTracker.shouldStop();
            }).parallelUnordered(new Threads(this.executor, this.executor.getMaximumPoolSize())).forEach(this::saveQueueLengthMetric);
        } catch (InterruptedException e) {
        }
    }

    private void saveQueueLengthMetric(GcpPubsubPhysicalNode<?, ?, ?> gcpPubsubPhysicalNode) {
        String name = gcpPubsubPhysicalNode.getName();
        logger.debug("starting query for {}", name);
        PhaseTimer phaseTimer = new PhaseTimer(name);
        ClientId clientId = gcpPubsubPhysicalNode.getClientId();
        phaseTimer.add("getClientId");
        TopicAndSubscriptionName topicAndSubscriptionName = gcpPubsubPhysicalNode.getTopicAndSubscriptionName().get();
        phaseTimer.add("getTopicAndSubscriptionName");
        String str = ((DatarouterGcpPubsubPlugin.SharedQueueNameRegistry) this.pluginInjector.getInstance(DatarouterGcpPubsubPlugin.SharedQueueNameRegistry.KEY)).queueOwnerByQueueName.get(topicAndSubscriptionName.topic().getTopic());
        if (str == null || str.equals(this.serviceName.get())) {
            saveQueueLength(topicAndSubscriptionName, clientId, phaseTimer);
            saveOldestAckMessageAge(topicAndSubscriptionName, clientId, phaseTimer);
        }
        logger.info("{}", phaseTimer);
    }

    private void saveOldestAckMessageAge(TopicAndSubscriptionName topicAndSubscriptionName, ClientId clientId, PhaseTimer phaseTimer) {
        GcpPubsubClientManager.GcpPubsubMetricDto gcpMetricDto = this.clientManager.getGcpMetricDto(topicAndSubscriptionName, clientId);
        phaseTimer.add("getMetric");
        if (gcpMetricDto.oldestUnackedMessageAgeS().isPresent()) {
            this.metrics.saveOldestAckMessageAge(gcpMetricDto.queueName(), gcpMetricDto.oldestUnackedMessageAgeS().get().longValue(), GcpPubsubClientType.NAME);
            phaseTimer.add("saveGaugeOldestUnackMessageAge");
        }
    }

    private void saveQueueLength(TopicAndSubscriptionName topicAndSubscriptionName, ClientId clientId, PhaseTimer phaseTimer) {
        GcpPubsubClientManager.GcpPubsubMetricDto gcpMetricDto = this.clientManager.getGcpMetricDto(topicAndSubscriptionName, clientId);
        phaseTimer.add("getMetric");
        if (gcpMetricDto.numUndeliveredMessages().isPresent()) {
            this.metrics.saveQueueLength(gcpMetricDto.queueName(), gcpMetricDto.numUndeliveredMessages().get().longValue(), GcpPubsubClientType.NAME);
            phaseTimer.add("saveGaugeNumUndeliveredMessages");
        }
    }
}
