package io.github.quickmsg.core.protocol;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.enums.ChannelStatus;
import io.github.quickmsg.common.message.MessageRegistry;
import io.github.quickmsg.common.message.MqttMessageBuilder;
import io.github.quickmsg.common.message.RetainMessage;
import io.github.quickmsg.common.message.SessionMessage;
import io.github.quickmsg.common.message.SmqttMessage;
import io.github.quickmsg.common.protocol.Protocol;
import io.github.quickmsg.common.topic.SubscribeTopic;
import io.github.quickmsg.common.topic.TopicRegistry;
import io.github.quickmsg.common.utils.MessageUtils;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;

/* loaded from: input_file:io/github/quickmsg/core/protocol/PublishProtocol.class */
public class PublishProtocol implements Protocol<MqttPublishMessage> {
    private static final Logger log = LoggerFactory.getLogger(PublishProtocol.class);
    private static List<MqttMessageType> MESSAGE_TYPE_LIST = new ArrayList();

    /* renamed from: io.github.quickmsg.core.protocol.PublishProtocol$1, reason: invalid class name */
    /* loaded from: input_file:io/github/quickmsg/core/protocol/PublishProtocol$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS = new int[MqttQoS.values().length];

        static {
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.AT_MOST_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.AT_LEAST_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.EXACTLY_ONCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public List<MqttMessageType> getMqttMessageTypes() {
        return MESSAGE_TYPE_LIST;
    }

    public Mono<Void> parseProtocol(SmqttMessage<MqttPublishMessage> smqttMessage, MqttChannel mqttChannel, ContextView contextView) {
        try {
            MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) smqttMessage.getMessage();
            ReceiveContext receiveContext = (ReceiveContext) contextView.get(ReceiveContext.class);
            TopicRegistry topicRegistry = receiveContext.getTopicRegistry();
            MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader();
            MessageRegistry messageRegistry = receiveContext.getMessageRegistry();
            Set<SubscribeTopic> subscribesByTopic = topicRegistry.getSubscribesByTopic(variableHeader.topicName(), mqttPublishMessage.fixedHeader().qosLevel());
            if (mqttChannel.getIsMock().booleanValue()) {
                return send(subscribesByTopic, mqttPublishMessage, messageRegistry, filterRetainMessage(mqttPublishMessage, messageRegistry));
            }
            switch (AnonymousClass1.$SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[mqttPublishMessage.fixedHeader().qosLevel().ordinal()]) {
                case 1:
                    return send(subscribesByTopic, mqttPublishMessage, messageRegistry, filterRetainMessage(mqttPublishMessage, messageRegistry));
                case 2:
                    return send(subscribesByTopic, mqttPublishMessage, messageRegistry, mqttChannel.write(MqttMessageBuilder.buildPublishAck(variableHeader.packetId()), false).then(filterRetainMessage(mqttPublishMessage, messageRegistry)));
                case 3:
                    if (!mqttChannel.existQos2Msg(variableHeader.packetId()).booleanValue()) {
                        return mqttChannel.cacheQos2Msg(variableHeader.packetId(), MessageUtils.wrapPublishMessage(mqttPublishMessage, mqttPublishMessage.fixedHeader().qosLevel(), 0)).then(mqttChannel.write(MqttMessageBuilder.buildPublishRec(variableHeader.packetId()), true));
                    }
                    break;
            }
            return Mono.empty();
        } catch (Exception e) {
            log.error("error ", e);
            return Mono.empty();
        }
    }

    private Mono<Void> send(Set<SubscribeTopic> set, MqttPublishMessage mqttPublishMessage, MessageRegistry messageRegistry, Mono<Void> mono) {
        return Mono.when((Iterable) set.stream().filter(subscribeTopic -> {
            return filterOfflineSession(subscribeTopic.getMqttChannel(), messageRegistry, mqttPublishMessage);
        }).map(subscribeTopic2 -> {
            return subscribeTopic2.getMqttChannel().write(MessageUtils.wrapPublishMessage(mqttPublishMessage, subscribeTopic2.getQoS(), subscribeTopic2.getMqttChannel().generateMessageId()), subscribeTopic2.getQoS().value() > 0);
        }).collect(Collectors.toList())).then(mono);
    }

    private boolean filterOfflineSession(MqttChannel mqttChannel, MessageRegistry messageRegistry, MqttPublishMessage mqttPublishMessage) {
        if (mqttChannel.getStatus() == ChannelStatus.ONLINE) {
            return true;
        }
        messageRegistry.saveSessionMessage(SessionMessage.of(mqttChannel.getClientIdentifier(), mqttPublishMessage));
        return false;
    }

    private Mono<Void> filterRetainMessage(MqttPublishMessage mqttPublishMessage, MessageRegistry messageRegistry) {
        return Mono.fromRunnable(() -> {
            if (mqttPublishMessage.fixedHeader().isRetain()) {
                messageRegistry.saveRetainMessage(RetainMessage.of(mqttPublishMessage));
            }
        });
    }

    static {
        MESSAGE_TYPE_LIST.add(MqttMessageType.PUBLISH);
    }
}
