package io.github.quickmsg.core.spi;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.topic.SubscribeTopic;
import io.github.quickmsg.common.topic.TopicRegistry;
import io.github.quickmsg.core.topic.FixedTopicFilter;
import io.github.quickmsg.core.topic.TopicFilter;
import io.github.quickmsg.core.topic.TreeTopicFilter;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/quickmsg/core/spi/DefaultTopicRegistry.class */
public class DefaultTopicRegistry implements TopicRegistry {
    private static final Logger log = LoggerFactory.getLogger(DefaultTopicRegistry.class);
    private static final String ONE_SYMBOL = "+";
    private static final String MORE_SYMBOL = "#";
    private TopicFilter fixedTopicFilter = new FixedTopicFilter();
    private TopicFilter treeTopicFilter = new TreeTopicFilter();

    public void registrySubscribeTopic(String str, MqttChannel mqttChannel, MqttQoS mqttQoS) {
        registrySubscribeTopic(new SubscribeTopic(str, mqttQoS, mqttChannel));
    }

    public void registrySubscribeTopic(SubscribeTopic subscribeTopic) {
        if (subscribeTopic.getTopicFilter().contains(ONE_SYMBOL) || subscribeTopic.getTopicFilter().contains(MORE_SYMBOL)) {
            this.treeTopicFilter.addSubscribeTopic(subscribeTopic);
        } else {
            this.fixedTopicFilter.addSubscribeTopic(subscribeTopic);
        }
    }

    public void clear(MqttChannel mqttChannel) {
        Set topics = mqttChannel.getTopics();
        if (log.isDebugEnabled()) {
            log.info("mqttChannel channel {} clear topics {}", mqttChannel, topics);
        }
        topics.forEach(this::removeSubscribeTopic);
    }

    public void removeSubscribeTopic(SubscribeTopic subscribeTopic) {
        if (subscribeTopic.getTopicFilter().contains(ONE_SYMBOL) || subscribeTopic.getTopicFilter().contains(MORE_SYMBOL)) {
            this.treeTopicFilter.removeSubscribeTopic(subscribeTopic);
        } else {
            this.fixedTopicFilter.removeSubscribeTopic(subscribeTopic);
        }
    }

    public Set<SubscribeTopic> getSubscribesByTopic(String str, MqttQoS mqttQoS) {
        Set<SubscribeTopic> subscribeByTopic = this.fixedTopicFilter.getSubscribeByTopic(str, mqttQoS);
        subscribeByTopic.addAll(this.treeTopicFilter.getSubscribeByTopic(str, mqttQoS));
        return subscribeByTopic;
    }

    public void registrySubscribesTopic(Set<SubscribeTopic> set) {
        set.forEach(this::registrySubscribeTopic);
    }

    public Map<String, Set<MqttChannel>> getAllTopics() {
        Set<SubscribeTopic> allSubscribesTopic = this.fixedTopicFilter.getAllSubscribesTopic();
        allSubscribesTopic.addAll(this.treeTopicFilter.getAllSubscribesTopic());
        return (Map) allSubscribesTopic.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getTopicFilter();
        }, Collectors.mapping((v0) -> {
            return v0.getMqttChannel();
        }, Collectors.toSet())));
    }

    public Integer counts() {
        return Integer.valueOf(this.fixedTopicFilter.count() + this.treeTopicFilter.count());
    }
}
