package io.github.quickmsg.core.protocol;

import io.github.quickmsg.common.acl.AclAction;
import io.github.quickmsg.common.acl.AclManager;
import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.context.ContextHolder;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.integrate.SubscribeTopic;
import io.github.quickmsg.common.integrate.msg.IntegrateMessages;
import io.github.quickmsg.common.integrate.topic.IntegrateTopics;
import io.github.quickmsg.common.log.LogEvent;
import io.github.quickmsg.common.log.LogManager;
import io.github.quickmsg.common.log.LogStatus;
import io.github.quickmsg.common.message.mqtt.SubscribeMessage;
import io.github.quickmsg.common.metric.CounterType;
import io.github.quickmsg.common.protocol.Protocol;
import io.github.quickmsg.common.utils.JacksonUtil;
import io.github.quickmsg.common.utils.MqttMessageUtils;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import reactor.util.context.ContextView;

/* loaded from: input_file:io/github/quickmsg/core/protocol/SubscribeProtocol.class */
public class SubscribeProtocol implements Protocol<SubscribeMessage> {
    public void parseProtocol(SubscribeMessage subscribeMessage, MqttChannel mqttChannel, ContextView contextView) {
        ContextHolder.getReceiveContext().getMetricManager().getMetricRegistry().getMetricCounter(CounterType.SUBSCRIBE_EVENT).increment();
        ReceiveContext receiveContext = (ReceiveContext) contextView.get(ReceiveContext.class);
        LogManager logManager = receiveContext.getLogManager();
        IntegrateTopics topics = receiveContext.getIntegrate().getTopics();
        AclManager aclManager = receiveContext.getAclManager();
        IntegrateMessages messages = receiveContext.getIntegrate().getMessages();
        topics.registryTopic(mqttChannel, (List) subscribeMessage.getSubscribeTopics().stream().filter(subscribeTopic -> {
            return aclManager.check(mqttChannel, subscribeTopic.getTopicFilter(), AclAction.SUBSCRIBE);
        }).peek(subscribeTopic2 -> {
            loadRetainMessage(messages, subscribeTopic2);
        }).collect(Collectors.toList()));
        logManager.printInfo(mqttChannel, LogEvent.SUBSCRIBE, LogStatus.SUCCESS, JacksonUtil.bean2Json(subscribeMessage));
        mqttChannel.write(MqttMessageUtils.buildSubAck(subscribeMessage.getMessageId(), (List) subscribeMessage.getSubscribeTopics().stream().map(subscribeTopic3 -> {
            return Integer.valueOf(subscribeTopic3.getQoS().value());
        }).collect(Collectors.toList())));
    }

    public Class<SubscribeMessage> getClassType() {
        return SubscribeMessage.class;
    }

    private void loadRetainMessage(IntegrateMessages integrateMessages, SubscribeTopic subscribeTopic) {
        integrateMessages.getRetainMessage(subscribeTopic.getTopicFilter()).forEach(retainMessage -> {
            Optional.ofNullable(subscribeTopic.getMqttChannel()).ifPresent(mqttChannel -> {
                mqttChannel.sendPublish(subscribeTopic.minQos(MqttQoS.valueOf(retainMessage.getQos())), retainMessage.toPublishMessage());
            });
        });
    }
}
