package io.cassandrareaper.service;

import com.codahale.metrics.InstrumentedScheduledExecutorService;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.MultimapBuilder;
import com.google.common.collect.SetMultimap;
import io.cassandrareaper.AppContext;
import io.cassandrareaper.ReaperException;
import io.cassandrareaper.core.Cluster;
import io.cassandrareaper.core.DiagEventSubscription;
import io.cassandrareaper.core.Node;
import io.cassandrareaper.jmx.DiagnosticProxy;
import io.cassandrareaper.jmx.JmxProxy;
import io.cassandrareaper.resources.view.DiagnosticEvent;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.management.Notification;
import javax.management.ReflectionException;
import javax.ws.rs.core.MediaType;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.glassfish.jersey.media.sse.EventOutput;
import org.glassfish.jersey.media.sse.OutboundEvent;
import org.glassfish.jersey.media.sse.SseBroadcaster;
import org.glassfish.jersey.server.ChunkedOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/cassandrareaper/service/DiagEventSubscriptionService.class */
public final class DiagEventSubscriptionService {
    private static final Logger LOG;
    private static final Map<DiagEventSubscription, Broadcaster> BROADCASTERS;
    private static final Map<Node, DiagEventPoller> POLLERS_BY_NODE;
    private static final ObjectMapper JSON_MAPPER;
    private static final AtomicLong ID_COUNTER;
    private final AppContext context;
    private final HttpClient httpClient;
    private final ScheduledExecutorService scheduler;
    private Set<DiagEventSubscription> subsAlwaysActive;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Map<Node, NotificationListener> listenerByNode = new ConcurrentHashMap();
    private final AtomicLong lastUpdateCheck = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/cassandrareaper/service/DiagEventSubscriptionService$Broadcaster.class */
    public static final class Broadcaster extends SseBroadcaster {
        private final AtomicLong outputs;
        private final DiagEventSubscription sub;

        private Broadcaster(DiagEventSubscription diagEventSubscription) {
            this.outputs = new AtomicLong();
            this.sub = diagEventSubscription;
        }

        @Override // org.glassfish.jersey.server.Broadcaster, org.glassfish.jersey.server.BroadcasterListener
        public void onException(ChunkedOutput<OutboundEvent> chunkedOutput, Exception exc) {
            super.onException(chunkedOutput, exc);
            DiagEventSubscriptionService.LOG.debug("[{}] SSE exception", this);
        }

        @Override // org.glassfish.jersey.server.Broadcaster
        public <OUT extends ChunkedOutput<OutboundEvent>> boolean add(OUT out) {
            DiagEventSubscriptionService.LOG.debug("[{}] Adding SSE channel", this, out);
            this.outputs.incrementAndGet();
            return super.add((Broadcaster) out);
        }

        @Override // org.glassfish.jersey.server.Broadcaster, org.glassfish.jersey.server.BroadcasterListener
        public void onClose(ChunkedOutput<OutboundEvent> chunkedOutput) {
            super.onClose(chunkedOutput);
            DiagEventSubscriptionService.LOG.debug("[{}] SSE channel closed", this);
            this.outputs.decrementAndGet();
        }

        boolean isActive() {
            return this.outputs.get() > 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/cassandrareaper/service/DiagEventSubscriptionService$NotificationListener.class */
    public static class NotificationListener implements javax.management.NotificationListener {
        private final Node node;
        private final Consumer<Notification> onConnectionClosed;
        private final Consumer<Map<String, Comparable>> onSummary;
        private final ExecutorService taskExecutor;

        NotificationListener(Node node, Consumer<Notification> consumer, Consumer<Map<String, Comparable>> consumer2, ExecutorService executorService) {
            this.node = node;
            this.onConnectionClosed = consumer;
            this.onSummary = consumer2;
            this.taskExecutor = executorService;
        }

        public void handleNotification(Notification notification, Object obj) {
            this.taskExecutor.submit(() -> {
                String name = Thread.currentThread().getName();
                try {
                    try {
                        Thread.currentThread().setName(this.node.getHostname());
                        String type = notification.getType();
                        boolean z = -1;
                        switch (type.hashCode()) {
                            case -739658258:
                                if (type.equals("jmx.remote.connection.notifs.lost")) {
                                    z = 2;
                                    break;
                                }
                                break;
                            case -411860211:
                                if (type.equals("jmx.remote.connection.closed")) {
                                    z = false;
                                    break;
                                }
                                break;
                            case -336316962:
                                if (type.equals("jmx.remote.connection.failed")) {
                                    z = true;
                                    break;
                                }
                                break;
                            case 880266790:
                                if (type.equals("event_last_id_summary")) {
                                    z = 3;
                                    break;
                                }
                                break;
                        }
                        switch (z) {
                            case false:
                            case true:
                                DiagEventSubscriptionService.LOG.debug("JMX connection closed");
                                if (this.onConnectionClosed != null) {
                                    this.onConnectionClosed.accept(notification);
                                    break;
                                }
                                break;
                            case true:
                                DiagEventSubscriptionService.LOG.warn("Lost JMX notifications");
                                break;
                            case true:
                                DiagEventSubscriptionService.LOG.debug("Received event summary: {}", notification);
                                if (this.onSummary != null) {
                                    this.onSummary.accept((Map) notification.getUserData());
                                    break;
                                }
                                break;
                        }
                        Thread.currentThread().setName(name);
                    } catch (RuntimeException e) {
                        DiagEventSubscriptionService.LOG.error("Error while handling JMX notification", (Throwable) e);
                        Thread.currentThread().setName(name);
                    }
                } catch (Throwable th) {
                    Thread.currentThread().setName(name);
                    throw th;
                }
            });
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.node, ((NotificationListener) obj).node);
        }

        public int hashCode() {
            return Objects.hash(this.node);
        }
    }

    private DiagEventSubscriptionService(AppContext appContext, HttpClient httpClient, ScheduledExecutorService scheduledExecutorService) {
        this.context = appContext;
        this.httpClient = httpClient;
        this.scheduler = new InstrumentedScheduledExecutorService(scheduledExecutorService, appContext.metricRegistry);
        this.scheduler.scheduleAtFixedRate(this::updateEnabledEvents, 5L, 5L, TimeUnit.SECONDS);
        this.scheduler.scheduleWithFixedDelay(this::pingSseClients, 5L, 5L, TimeUnit.SECONDS);
    }

    public static DiagEventSubscriptionService create(AppContext appContext, HttpClient httpClient, ScheduledExecutorService scheduledExecutorService) {
        return new DiagEventSubscriptionService(appContext, httpClient, scheduledExecutorService);
    }

    public DiagEventSubscription getEventSubscription(UUID uuid) {
        return this.context.storage.getEventSubscription(uuid);
    }

    public void deleteEventSubscription(UUID uuid) {
        DiagEventSubscription eventSubscription = this.context.storage.getEventSubscription(uuid);
        if (!this.context.storage.deleteEventSubscription(uuid) || null == eventSubscription) {
            return;
        }
        updateEnabledEvents(new HashSet(eventSubscription.getNodes()));
    }

    public DiagEventSubscription addEventSubscription(DiagEventSubscription diagEventSubscription) {
        if ($assertionsDisabled || this.context.storage.getEventSubscriptions(diagEventSubscription.getCluster()).stream().noneMatch(diagEventSubscription2 -> {
            return Objects.equals(diagEventSubscription2.getNodes(), diagEventSubscription.getNodes()) && Objects.equals(diagEventSubscription2.getEvents(), diagEventSubscription.getEvents());
        })) {
            return this.context.storage.addEventSubscription(diagEventSubscription.withId(diagEventSubscription.getId().orElse(UUID.randomUUID())));
        }
        throw new AssertionError();
    }

    public void subscribe(DiagEventSubscription diagEventSubscription, EventOutput eventOutput, String str) {
        Broadcaster computeIfAbsent = BROADCASTERS.computeIfAbsent(diagEventSubscription, diagEventSubscription2 -> {
            return new Broadcaster(diagEventSubscription);
        });
        LOG.debug("Using SSE broadcaster for subscription {} and new client {}", diagEventSubscription.getId(), str);
        computeIfAbsent.add((Broadcaster) eventOutput);
        updateEnabledEvents(new HashSet(diagEventSubscription.getNodes()));
    }

    private synchronized void updateEnabledEvents() {
        if (this.context.isDistributed.get()) {
            if (!BROADCASTERS.isEmpty() || null == this.subsAlwaysActive || !this.subsAlwaysActive.isEmpty() || System.currentTimeMillis() - this.lastUpdateCheck.get() > TimeUnit.MINUTES.toMillis(1L)) {
                this.lastUpdateCheck.set(System.currentTimeMillis());
                updateEnabledEvents(Collections.emptySet());
            }
        }
    }

    private synchronized void updateEnabledEvents(Set<String> set) {
        LOG.debug("Checking current event subscriptions");
        Collection<DiagEventSubscription> eventSubscriptions = this.context.storage.getEventSubscriptions();
        this.subsAlwaysActive = (Set) eventSubscriptions.stream().filter(diagEventSubscription -> {
            return (diagEventSubscription.getExportFileLogger() == null && diagEventSubscription.getExportHttpEndpoint() == null) ? false : true;
        }).collect(Collectors.toSet());
        Set set2 = (Set) eventSubscriptions.stream().filter(diagEventSubscription2 -> {
            return !this.subsAlwaysActive.contains(diagEventSubscription2);
        }).filter(diagEventSubscription3 -> {
            return BROADCASTERS.containsKey(diagEventSubscription3) && BROADCASTERS.get(diagEventSubscription3).isActive();
        }).collect(Collectors.toSet());
        SetMultimap<K, V> build = MultimapBuilder.SetMultimapBuilder.hashKeys().hashSetValues().build();
        for (DiagEventSubscription diagEventSubscription4 : eventSubscriptions) {
            Cluster build2 = Cluster.builder().withName(diagEventSubscription4.getCluster()).withSeedHosts(diagEventSubscription4.getNodes()).build();
            for (String str : diagEventSubscription4.getNodes()) {
                if (set.isEmpty() || set.contains(str)) {
                    build.put(Node.builder().withCluster(build2).withHostname(str).build(), diagEventSubscription4);
                }
            }
        }
        Predicate predicate = diagEventSubscription5 -> {
            return this.subsAlwaysActive.contains(diagEventSubscription5) || set2.contains(diagEventSubscription5);
        };
        Map asMap = build.asMap();
        CountDownLatch countDownLatch = new CountDownLatch(asMap.size());
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        LOG.debug("Updating event subscriptions for {} nodes", Integer.valueOf(asMap.size()));
        for (Node node : asMap.keySet()) {
            LOG.debug("{}: {}", node, asMap.get(node));
            this.scheduler.submit(() -> {
                if (atomicBoolean.get()) {
                    String name = Thread.currentThread().getName();
                    try {
                        try {
                            Thread.currentThread().setName(node.getHostname());
                            LOG.debug("Starting to update event subscriptions for {}", node);
                            JmxProxy connectAny = this.context.jmxConnectionFactory.connectAny(Collections.singleton(node));
                            Collection collection = (Collection) asMap.get(node);
                            Set<String> set3 = (Set) collection.stream().filter(predicate).flatMap(diagEventSubscription6 -> {
                                return diagEventSubscription6.getEvents().stream();
                            }).collect(Collectors.toSet());
                            Set<String> set4 = (Set) collection.stream().flatMap(diagEventSubscription7 -> {
                                return diagEventSubscription7.getEvents().stream();
                            }).filter(str2 -> {
                                return !set3.contains(str2);
                            }).collect(Collectors.toSet());
                            if (!set3.isEmpty()) {
                                if (!set4.isEmpty()) {
                                    LOG.debug("Disabling events for inactive subscriptions");
                                    enableEvents(node, set4, false, connectAny);
                                }
                                if (!set3.isEmpty()) {
                                    LOG.debug("Enabling events for active subscriptions");
                                    enableEvents(node, set3, true, connectAny);
                                    subscribeNotifications(node, connectAny, createPoller(node, connectAny, set3, true));
                                }
                                Thread.currentThread().setName(name);
                                countDownLatch.countDown();
                                LOG.debug("Finished handling event subscriptions for {} ({}/{})", node, Long.valueOf(countDownLatch.getCount()), Integer.valueOf(asMap.size()));
                                return;
                            }
                            LOG.debug("No active events subscriptions");
                            Set<String> hashSet = new HashSet<>();
                            hashSet.addAll(set4);
                            DiagEventPoller remove = POLLERS_BY_NODE.remove(node);
                            if (remove != null) {
                                LOG.debug("Stopping existing event poller");
                                hashSet.addAll(remove.getEnabledEvents());
                                remove.stop();
                            }
                            unsubscribeNotifications(node, connectAny);
                            if (!hashSet.isEmpty()) {
                                enableEvents(node, hashSet, false, connectAny);
                            }
                            Thread.currentThread().setName(name);
                            countDownLatch.countDown();
                            LOG.debug("Finished handling event subscriptions for {} ({}/{})", node, Long.valueOf(countDownLatch.getCount()), Integer.valueOf(asMap.size()));
                        } catch (ReaperException e) {
                            LOG.warn("Failed to acquire JMX connection {}", e.getMessage());
                            Thread.currentThread().setName(name);
                            countDownLatch.countDown();
                            LOG.debug("Finished handling event subscriptions for {} ({}/{})", node, Long.valueOf(countDownLatch.getCount()), Integer.valueOf(asMap.size()));
                        }
                    } catch (Throwable th) {
                        Thread.currentThread().setName(name);
                        countDownLatch.countDown();
                        LOG.debug("Finished handling event subscriptions for {} ({}/{})", node, Long.valueOf(countDownLatch.getCount()), Integer.valueOf(asMap.size()));
                        throw th;
                    }
                }
            });
        }
        try {
            if (!countDownLatch.await(this.context.config.getJmxConnectionTimeoutInSeconds(), TimeUnit.SECONDS)) {
                atomicBoolean.set(false);
                LOG.warn("Timeout while handling ({}/{}) remaining event subscriptions for some nodes", Long.valueOf(countDownLatch.getCount()), Integer.valueOf(asMap.size()));
            }
        } catch (InterruptedException e) {
        }
    }

    private void enableEvents(Node node, Set<String> set, boolean z, JmxProxy jmxProxy) {
        for (String str : set) {
            Logger logger = LOG;
            Object[] objArr = new Object[3];
            objArr[0] = z ? "Enabling" : "Disabling";
            objArr[1] = str;
            objArr[2] = node;
            logger.debug("{} {} for {}", objArr);
            if (z) {
                try {
                    DiagnosticProxy.create(jmxProxy).enableEventPersistence(str);
                } catch (RuntimeException e) {
                    if (e.getCause() instanceof ClassNotFoundException) {
                        LOG.warn("Event not supported on server: {}", str);
                    } else if (e.getCause() instanceof ReflectionException) {
                        LOG.warn(String.format("Failed to manage events via JMX for %s: incompatible Cassandra version (>=4.0 required)", node));
                    } else {
                        LOG.error(String.format("Failed to enable/disable event %s via JMX for %s", str, node), (Throwable) e);
                    }
                }
            } else {
                DiagnosticProxy.create(jmxProxy).disableEventPersistence(str);
            }
        }
    }

    private DiagEventPoller createPoller(Node node, JmxProxy jmxProxy, Set<String> set, boolean z) {
        DiagEventPoller computeIfAbsent = POLLERS_BY_NODE.computeIfAbsent(node, node2 -> {
            return new DiagEventPoller(node2, jmxProxy, this::onEvent, this.scheduler);
        });
        computeIfAbsent.setEnabledEvents(set);
        if (z) {
            computeIfAbsent.start();
        } else {
            computeIfAbsent.stop();
        }
        return computeIfAbsent;
    }

    private void subscribeNotifications(Node node, JmxProxy jmxProxy, DiagEventPoller diagEventPoller) {
        if (this.listenerByNode.containsKey(node)) {
            return;
        }
        LOG.debug("Subscribing to notifications on {} ({})", jmxProxy.getHost(), jmxProxy.getClusterName());
        Consumer consumer = notification -> {
            updateEnabledEvents(Collections.singleton(node.getHostname()));
        };
        Objects.requireNonNull(diagEventPoller);
        NotificationListener notificationListener = new NotificationListener(node, consumer, diagEventPoller::onSummary, this.scheduler);
        if (null == this.listenerByNode.putIfAbsent(node, notificationListener)) {
            DiagnosticProxy.create(jmxProxy).subscribeNotifications(notificationListener);
        }
    }

    private void unsubscribeNotifications(Node node, JmxProxy jmxProxy) {
        LOG.debug("Unsubscribing from notifications on {} ({})", jmxProxy.getHost(), jmxProxy.getClusterName());
        NotificationListener remove = this.listenerByNode.remove(node);
        Preconditions.checkState(null != remove, "Notification listener not found for %s", node);
        DiagnosticProxy.create(jmxProxy).unsubscribeNotifications(remove);
    }

    private void onEvent(DiagnosticEvent diagnosticEvent) {
        try {
            String writeValueAsString = JSON_MAPPER.writeValueAsString(diagnosticEvent);
            for (DiagEventSubscription diagEventSubscription : this.subsAlwaysActive) {
                try {
                    if (diagEventSubscription.getCluster().equals(diagnosticEvent.getCluster()) && diagEventSubscription.getNodes().contains(diagnosticEvent.getNode()) && diagEventSubscription.getEvents().contains(diagnosticEvent.getEventClass())) {
                        if (diagEventSubscription.getExportFileLogger() != null) {
                            Logger logger = LoggerFactory.getLogger(diagEventSubscription.getExportFileLogger());
                            if (logger == null) {
                                LOG.error("Failed to get logger: {}", diagEventSubscription.getExportFileLogger());
                            } else {
                                logger.info(writeValueAsString);
                            }
                        }
                        if (diagEventSubscription.getExportHttpEndpoint() != null) {
                            HttpPost httpPost = new HttpPost(diagEventSubscription.getExportHttpEndpoint());
                            httpPost.setEntity(new StringEntity(writeValueAsString, ContentType.APPLICATION_JSON));
                            try {
                                EntityUtils.consumeQuietly(this.httpClient.execute(httpPost).getEntity());
                            } catch (IOException e) {
                                LOG.error("Failed to post event to endpoint: " + diagEventSubscription.getExportHttpEndpoint(), (Throwable) e);
                            }
                        }
                    }
                } catch (RuntimeException e2) {
                    LOG.error("Error while checking subscription: " + diagEventSubscription, (Throwable) e2);
                }
            }
            OutboundEvent build = new OutboundEvent.Builder().id(String.valueOf(ID_COUNTER.getAndIncrement())).mediaType(MediaType.APPLICATION_JSON_TYPE).data(writeValueAsString).build();
            for (Map.Entry<DiagEventSubscription, Broadcaster> entry : BROADCASTERS.entrySet()) {
                DiagEventSubscription key = entry.getKey();
                if (key.getCluster().equals(diagnosticEvent.getCluster()) && key.getNodes().contains(diagnosticEvent.getNode()) && key.getEvents().contains(diagnosticEvent.getEventClass())) {
                    LOG.debug("[{}] Broadcasting diagnostic event {}/{} from {}", entry.getValue(), diagnosticEvent.getEventClass(), diagnosticEvent.getEventType(), diagnosticEvent.getNode());
                    entry.getValue().broadcast(build);
                }
            }
        } catch (JsonProcessingException e3) {
            throw new IllegalStateException("Failed to serialize diagnostic event as JSON", e3);
        }
    }

    private void pingSseClients() {
        OutboundEvent.Builder name = new OutboundEvent.Builder().id(String.valueOf(ID_COUNTER.getAndIncrement())).mediaType(MediaType.APPLICATION_JSON_TYPE).name("ping");
        for (Broadcaster broadcaster : BROADCASTERS.values()) {
            broadcaster.broadcast(name.data(broadcaster.sub).build());
        }
    }

    static {
        $assertionsDisabled = !DiagEventSubscriptionService.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) DiagEventSubscriptionService.class);
        BROADCASTERS = new HashMap();
        POLLERS_BY_NODE = new HashMap();
        JSON_MAPPER = new ObjectMapper(new JsonFactory());
        ID_COUNTER = new AtomicLong(0L);
    }
}
