package io.github.quickmsg.core.protocol;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.message.MqttMessageBuilder;
import io.github.quickmsg.common.message.SmqttMessage;
import io.github.quickmsg.common.metric.CounterType;
import io.github.quickmsg.common.metric.MetricManagerHolder;
import io.github.quickmsg.common.protocol.Protocol;
import io.github.quickmsg.common.topic.SubscribeTopic;
import io.github.quickmsg.common.topic.TopicRegistry;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;

/* loaded from: input_file:io/github/quickmsg/core/protocol/UnSubscribeProtocol.class */
public class UnSubscribeProtocol implements Protocol<MqttUnsubscribeMessage> {
    private static List<MqttMessageType> MESSAGE_TYPE_LIST = new ArrayList();

    public Mono<Void> parseProtocol(SmqttMessage<MqttUnsubscribeMessage> smqttMessage, MqttChannel mqttChannel, ContextView contextView) {
        MqttUnsubscribeMessage message = smqttMessage.getMessage();
        return Mono.fromRunnable(() -> {
            MetricManagerHolder.metricManager.getMetricRegistry().getMetricCounter(CounterType.UN_SUBSCRIBE_EVENT).increment();
            TopicRegistry topicRegistry = ((ReceiveContext) contextView.get(ReceiveContext.class)).getTopicRegistry();
            Stream map = message.payload().topics().stream().map(str -> {
                return new SubscribeTopic(str, MqttQoS.AT_MOST_ONCE, mqttChannel);
            });
            topicRegistry.getClass();
            map.forEach(topicRegistry::removeSubscribeTopic);
        }).then(mqttChannel.write(MqttMessageBuilder.buildUnsubAck(message.variableHeader().messageId()), false));
    }

    public List<MqttMessageType> getMqttMessageTypes() {
        return MESSAGE_TYPE_LIST;
    }

    static {
        MESSAGE_TYPE_LIST.add(MqttMessageType.UNSUBSCRIBE);
    }
}
