package io.github.quickmsg.core.protocol;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.message.MessageRegistry;
import io.github.quickmsg.common.message.MqttMessageBuilder;
import io.github.quickmsg.common.message.SmqttMessage;
import io.github.quickmsg.common.protocol.Protocol;
import io.github.quickmsg.common.topic.SubscribeTopic;
import io.github.quickmsg.common.topic.TopicRegistry;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;

/* loaded from: input_file:io/github/quickmsg/core/protocol/SubscribeProtocol.class */
public class SubscribeProtocol implements Protocol<MqttSubscribeMessage> {
    private static List<MqttMessageType> MESSAGE_TYPE_LIST = new ArrayList();

    public Mono<Void> parseProtocol(SmqttMessage<MqttSubscribeMessage> smqttMessage, MqttChannel mqttChannel, ContextView contextView) {
        MqttSubscribeMessage message = smqttMessage.getMessage();
        return Mono.fromRunnable(() -> {
            ReceiveContext receiveContext = (ReceiveContext) contextView.get(ReceiveContext.class);
            TopicRegistry topicRegistry = receiveContext.getTopicRegistry();
            MessageRegistry messageRegistry = receiveContext.getMessageRegistry();
            topicRegistry.registrySubscribesTopic((Set) message.payload().topicSubscriptions().stream().peek(mqttTopicSubscription -> {
                loadRetainMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName());
            }).map(mqttTopicSubscription2 -> {
                return new SubscribeTopic(mqttTopicSubscription2.topicName(), mqttTopicSubscription2.qualityOfService(), mqttChannel);
            }).collect(Collectors.toSet()));
        }).then(mqttChannel.write(MqttMessageBuilder.buildSubAck(message.variableHeader().messageId(), (List) message.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> {
            return Integer.valueOf(mqttTopicSubscription.qualityOfService().value());
        }).collect(Collectors.toList())), false));
    }

    private void loadRetainMessage(MessageRegistry messageRegistry, MqttChannel mqttChannel, String str) {
        messageRegistry.getRetainMessage(str).forEach(retainMessage -> {
            mqttChannel.write(retainMessage.toPublishMessage(mqttChannel), retainMessage.getQos() > 0).subscribe();
        });
    }

    public List<MqttMessageType> getMqttMessageTypes() {
        return MESSAGE_TYPE_LIST;
    }

    static {
        MESSAGE_TYPE_LIST.add(MqttMessageType.SUBSCRIBE);
    }
}
