package io.github.quickmsg.core.protocol;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.enums.ChannelStatus;
import io.github.quickmsg.common.message.MessageRegistry;
import io.github.quickmsg.common.message.MqttMessageBuilder;
import io.github.quickmsg.common.message.SessionMessage;
import io.github.quickmsg.common.message.SmqttMessage;
import io.github.quickmsg.common.metric.CounterType;
import io.github.quickmsg.common.metric.MetricManagerHolder;
import io.github.quickmsg.common.protocol.Protocol;
import io.github.quickmsg.common.topic.TopicRegistry;
import io.github.quickmsg.common.utils.MessageUtils;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.util.context.ContextView;

/* loaded from: input_file:io/github/quickmsg/core/protocol/CommonProtocol.class */
public class CommonProtocol implements Protocol<MqttMessage> {
    private static final Logger log = LoggerFactory.getLogger(CommonProtocol.class);
    private static List<MqttMessageType> MESSAGE_TYPE_LIST = new ArrayList();

    /* renamed from: io.github.quickmsg.core.protocol.CommonProtocol$1, reason: invalid class name */
    /* loaded from: input_file:io/github/quickmsg/core/protocol/CommonProtocol$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$netty$handler$codec$mqtt$MqttMessageType = new int[MqttMessageType.values().length];

        static {
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttMessageType[MqttMessageType.PINGREQ.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttMessageType[MqttMessageType.DISCONNECT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttMessageType[MqttMessageType.PUBREC.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttMessageType[MqttMessageType.PUBREL.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttMessageType[MqttMessageType.PUBCOMP.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttMessageType[MqttMessageType.PINGRESP.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    public Mono<Void> parseProtocol(SmqttMessage<MqttMessage> smqttMessage, MqttChannel mqttChannel, ContextView contextView) {
        ReceiveContext receiveContext = (ReceiveContext) contextView.get(ReceiveContext.class);
        MqttMessage message = smqttMessage.getMessage();
        switch (AnonymousClass1.$SwitchMap$io$netty$handler$codec$mqtt$MqttMessageType[message.fixedHeader().messageType().ordinal()]) {
            case 1:
                return mqttChannel.write(MqttMessageBuilder.buildPongMessage(), false);
            case 2:
                return Mono.fromRunnable(() -> {
                    MetricManagerHolder.metricManager.getMetricRegistry().getMetricCounter(CounterType.DIS_CONNECT_EVENT).increment();
                    mqttChannel.setWill((MqttChannel.Will) null);
                    Connection connection = mqttChannel.getConnection();
                    if (connection.isDisposed()) {
                        return;
                    }
                    connection.dispose();
                });
            case 3:
                int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
                return Mono.fromRunnable(() -> {
                    Optional.ofNullable(receiveContext.getTimeAckManager().getAck(Long.valueOf(mqttChannel.generateId(MqttMessageType.PUBLISH, Integer.valueOf(messageId))))).ifPresent((v0) -> {
                        v0.stop();
                    });
                }).then(mqttChannel.write(MqttMessageBuilder.buildPublishRel(messageId), true));
            case 4:
                int messageId2 = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
                return (Mono) mqttChannel.removeQos2Msg(messageId2).map(mqttPublishMessage -> {
                    TopicRegistry topicRegistry = receiveContext.getTopicRegistry();
                    MessageRegistry messageRegistry = receiveContext.getMessageRegistry();
                    return Mono.when((Iterable) topicRegistry.getSubscribesByTopic(mqttPublishMessage.variableHeader().topicName(), mqttPublishMessage.fixedHeader().qosLevel()).stream().filter(subscribeTopic -> {
                        return filterOfflineSession(subscribeTopic.getMqttChannel(), messageRegistry, MessageUtils.wrapPublishMessage(mqttPublishMessage, subscribeTopic.getQoS(), subscribeTopic.getMqttChannel().generateMessageId()));
                    }).map(subscribeTopic2 -> {
                        return subscribeTopic2.getMqttChannel().write(MessageUtils.wrapPublishMessage(mqttPublishMessage, subscribeTopic2.getQoS(), subscribeTopic2.getMqttChannel().generateMessageId()), subscribeTopic2.getQoS().value() > 0);
                    }).collect(Collectors.toList())).then(Mono.fromRunnable(() -> {
                        Optional.ofNullable(receiveContext.getTimeAckManager().getAck(Long.valueOf(mqttChannel.generateId(MqttMessageType.PUBREC, Integer.valueOf(messageId2))))).ifPresent((v0) -> {
                            v0.stop();
                        });
                    })).then(mqttChannel.write(MqttMessageBuilder.buildPublishComp(messageId2), false));
                }).orElseGet(() -> {
                    return mqttChannel.write(MqttMessageBuilder.buildPublishComp(messageId2), false);
                });
            case 5:
                int messageId3 = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
                return Mono.fromRunnable(() -> {
                    Optional.ofNullable(receiveContext.getTimeAckManager().getAck(Long.valueOf(mqttChannel.generateId(MqttMessageType.PUBREL, Integer.valueOf(messageId3))))).ifPresent((v0) -> {
                        v0.stop();
                    });
                });
            case 6:
            default:
                return Mono.empty();
        }
    }

    public List<MqttMessageType> getMqttMessageTypes() {
        return MESSAGE_TYPE_LIST;
    }

    private boolean filterOfflineSession(MqttChannel mqttChannel, MessageRegistry messageRegistry, MqttPublishMessage mqttPublishMessage) {
        if (mqttChannel.getStatus() == ChannelStatus.ONLINE) {
            return true;
        }
        messageRegistry.saveSessionMessage(SessionMessage.of(mqttChannel.getClientIdentifier(), mqttPublishMessage));
        return false;
    }

    static {
        MESSAGE_TYPE_LIST.add(MqttMessageType.PINGRESP);
        MESSAGE_TYPE_LIST.add(MqttMessageType.PINGREQ);
        MESSAGE_TYPE_LIST.add(MqttMessageType.DISCONNECT);
        MESSAGE_TYPE_LIST.add(MqttMessageType.PUBCOMP);
        MESSAGE_TYPE_LIST.add(MqttMessageType.PUBREC);
        MESSAGE_TYPE_LIST.add(MqttMessageType.PUBREL);
    }
}
