package io.github.quickmsg.core.topic;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.metric.CounterType;
import io.github.quickmsg.common.metric.MetricManagerHolder;
import io.github.quickmsg.common.topic.SubscribeTopic;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;

/* loaded from: input_file:io/github/quickmsg/core/topic/FixedTopicFilter.class */
public class FixedTopicFilter implements TopicFilter {
    private final LongAdder subscribeNumber = new LongAdder();
    private final Map<String, CopyOnWriteArraySet<SubscribeTopic>> topicChannels = new ConcurrentHashMap();

    @Override // io.github.quickmsg.core.topic.TopicFilter
    public Set<SubscribeTopic> getSubscribeByTopic(String str, MqttQoS mqttQoS) {
        return (Set) this.topicChannels.computeIfAbsent(str, str2 -> {
            return new CopyOnWriteArraySet();
        }).stream().map(subscribeTopic -> {
            return subscribeTopic.compareQos(mqttQoS);
        }).collect(Collectors.toSet());
    }

    @Override // io.github.quickmsg.core.topic.TopicFilter
    public void addSubscribeTopic(String str, MqttChannel mqttChannel, MqttQoS mqttQoS) {
        addSubscribeTopic(new SubscribeTopic(str, mqttQoS, mqttChannel));
    }

    @Override // io.github.quickmsg.core.topic.TopicFilter
    public void addSubscribeTopic(SubscribeTopic subscribeTopic) {
        if (this.topicChannels.computeIfAbsent(subscribeTopic.getTopicFilter(), str -> {
            return new CopyOnWriteArraySet();
        }).add(subscribeTopic)) {
            this.subscribeNumber.add(1L);
            subscribeTopic.linkSubscribe();
            MetricManagerHolder.metricManager.getMetricRegistry().getMetricCounter(CounterType.SUBSCRIBE).increment();
        }
    }

    @Override // io.github.quickmsg.core.topic.TopicFilter
    public void removeSubscribeTopic(SubscribeTopic subscribeTopic) {
        if (this.topicChannels.computeIfAbsent(subscribeTopic.getTopicFilter(), str -> {
            return new CopyOnWriteArraySet();
        }).remove(subscribeTopic)) {
            this.subscribeNumber.add(-1L);
            subscribeTopic.unLinkSubscribe();
            MetricManagerHolder.metricManager.getMetricRegistry().getMetricCounter(CounterType.SUBSCRIBE).decrement();
        }
    }

    @Override // io.github.quickmsg.core.topic.TopicFilter
    public int count() {
        return (int) this.subscribeNumber.sum();
    }

    @Override // io.github.quickmsg.core.topic.TopicFilter
    public Set<SubscribeTopic> getAllSubscribesTopic() {
        return (Set) this.topicChannels.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet());
    }
}
