package io.github.quickmsg.interate;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.integrate.Integrate;
import io.github.quickmsg.common.integrate.SubscribeTopic;
import io.github.quickmsg.common.integrate.topic.IntegrateTopics;
import io.github.quickmsg.common.utils.TopicRegexUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/quickmsg/interate/IgniteIntegrateTopics.class */
public class IgniteIntegrateTopics implements IntegrateTopics<SubscribeTopic> {
    private static final Logger log = LoggerFactory.getLogger(IgniteIntegrateTopics.class);
    private final IgniteIntegrate integrate;
    private final IgniteSet<String> shareCache;
    private final Map<String, Set<SubscribeTopic>> topicSubscribers = new ConcurrentHashMap();
    protected static final String ONE_SYMBOL = "+";
    protected static final String MORE_SYMBOL = "#";

    public boolean checkFilter(String str) {
        return str.contains(ONE_SYMBOL);
    }

    public IgniteIntegrateTopics(IgniteIntegrate igniteIntegrate) {
        this.integrate = igniteIntegrate;
        this.shareCache = igniteIntegrate.getIgnite().set("wildcard", new CollectionConfiguration().setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC).setCollocated(true));
    }

    public void registryTopic(MqttChannel mqttChannel, List<SubscribeTopic> list) {
        list.forEach(subscribeTopic -> {
            registryTopic(mqttChannel, subscribeTopic);
        });
    }

    public void registryTopic(MqttChannel mqttChannel, SubscribeTopic subscribeTopic) {
        Set<SubscribeTopic> computeIfAbsent = this.topicSubscribers.computeIfAbsent(subscribeTopic.getTopicFilter(), str -> {
            return new CopyOnWriteArraySet();
        });
        String topicFilter = subscribeTopic.getTopicFilter();
        if (computeIfAbsent.add(subscribeTopic)) {
            this.integrate.getCluster().listenTopic(topicFilter);
            mqttChannel.getTopics().add(subscribeTopic);
            if (isWildcard(topicFilter)) {
                this.shareCache.add(topicFilter);
            }
        }
    }

    public void removeTopic(MqttChannel mqttChannel, SubscribeTopic subscribeTopic) {
        this.topicSubscribers.compute(subscribeTopic.getTopicFilter(), (str, set) -> {
            if (set == null || set.size() < 1) {
                clearCache(subscribeTopic.getTopicFilter());
            } else if (set.remove(subscribeTopic) && set.size() < 1) {
                clearCache(subscribeTopic.getTopicFilter());
            }
            mqttChannel.getTopics().remove(subscribeTopic);
            return set;
        });
    }

    private void clearCache(String str) {
        this.integrate.getCluster().stopListenTopic(str);
        if (isWildcard(str)) {
            this.shareCache.remove(str);
        }
    }

    public void removeTopic(MqttChannel mqttChannel, List<SubscribeTopic> list) {
        for (int i = 0; i < list.size(); i++) {
            removeTopic(mqttChannel, list.get(i));
        }
    }

    public Set<SubscribeTopic> getMqttChannelsByTopic(String str) {
        return this.topicSubscribers.get(str);
    }

    public Long counts() {
        return null;
    }

    public boolean isWildcard(String str) {
        return str.contains(ONE_SYMBOL) || str.contains(MORE_SYMBOL);
    }

    public Set<String> getWildcardTopics(String str) {
        return (Set) this.shareCache.stream().filter(str2 -> {
            return str.matches(TopicRegexUtils.regexTopic(str2));
        }).collect(Collectors.toSet());
    }

    public Integrate getIntegrate() {
        return this.integrate;
    }
}
