package io.github.quickmsg.core.protocol;

import io.github.quickmsg.common.auth.PasswordAuthentication;
import io.github.quickmsg.common.channel.ChannelRegistry;
import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.enums.ChannelStatus;
import io.github.quickmsg.common.message.MessageRegistry;
import io.github.quickmsg.common.message.MqttMessageBuilder;
import io.github.quickmsg.common.message.RecipientRegistry;
import io.github.quickmsg.common.protocol.Protocol;
import io.github.quickmsg.common.topic.SubscribeTopic;
import io.github.quickmsg.common.topic.TopicRegistry;
import io.github.quickmsg.core.mqtt.MqttReceiveContext;
import io.github.quickmsg.metric.counter.WindowMetric;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.ContextView;

/* loaded from: input_file:io/github/quickmsg/core/protocol/ConnectProtocol.class */
public class ConnectProtocol implements Protocol<MqttConnectMessage> {
    private static final Logger log = LoggerFactory.getLogger(ConnectProtocol.class);
    private static List<MqttMessageType> MESSAGE_TYPE_LIST = new ArrayList();
    private static final int MILLI_SECOND_PERIOD = 1000;

    private static void accept(MqttChannel mqttChannel) {
        WindowMetric.WINDOW_METRIC_INSTANCE.recordConnect(-1);
    }

    public Mono<Void> parseProtocol(MqttConnectMessage mqttConnectMessage, MqttChannel mqttChannel, ContextView contextView) {
        MqttReceiveContext mqttReceiveContext = (MqttReceiveContext) contextView.get(ReceiveContext.class);
        RecipientRegistry recipientRegistry = mqttReceiveContext.getRecipientRegistry();
        MqttConnectVariableHeader variableHeader = mqttConnectMessage.variableHeader();
        MqttConnectPayload payload = mqttConnectMessage.payload();
        String clientIdentifier = payload.clientIdentifier();
        ChannelRegistry channelRegistry = mqttReceiveContext.getChannelRegistry();
        TopicRegistry topicRegistry = mqttReceiveContext.getTopicRegistry();
        PasswordAuthentication passwordAuthentication = mqttReceiveContext.getPasswordAuthentication();
        if (channelRegistry.exists(clientIdentifier)) {
            return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED), false).then(mqttChannel.close());
        }
        if (MqttVersion.MQTT_3_1_1.protocolLevel() != ((byte) variableHeader.version())) {
            return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION), false).then(mqttChannel.close());
        }
        if (!passwordAuthentication.auth(payload.userName(), payload.passwordInBytes())) {
            return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD), false).then(mqttChannel.close());
        }
        mqttChannel.getCloseDisposable().dispose();
        mqttChannel.setClientIdentifier(payload.clientIdentifier());
        if (variableHeader.isWillFlag()) {
            mqttChannel.setWill(MqttChannel.Will.builder().isRetain(variableHeader.isWillRetain()).willTopic(payload.willTopic()).willMessage(payload.willMessageInBytes()).mqttQoS(MqttQoS.valueOf(variableHeader.willQos())).build());
        }
        mqttChannel.setAuthTime(System.currentTimeMillis());
        mqttChannel.setKeepalive(variableHeader.keepAliveTimeSeconds());
        mqttChannel.setSessionPersistent(!variableHeader.isCleanSession());
        mqttChannel.setStatus(ChannelStatus.ONLINE);
        mqttChannel.setUsername(payload.userName());
        mqttChannel.getConnection().onReadIdle((variableHeader.keepAliveTimeSeconds() * MILLI_SECOND_PERIOD) << 1, () -> {
            close(mqttChannel, mqttReceiveContext);
        });
        mqttChannel.registryClose(mqttChannel2 -> {
            Optional.ofNullable(mqttChannel.getWill()).ifPresent(will -> {
                topicRegistry.getSubscribesByTopic(will.getWillTopic(), will.getMqttQoS()).forEach(subscribeTopic -> {
                    MqttChannel mqttChannel2 = subscribeTopic.getMqttChannel();
                    mqttChannel2.write(MqttMessageBuilder.buildPub(false, subscribeTopic.getQoS(), subscribeTopic.getQoS() == MqttQoS.AT_MOST_ONCE ? 0 : mqttChannel2.generateMessageId(), will.getWillTopic(), Unpooled.wrappedBuffer(will.getWillMessage())), subscribeTopic.getQoS().value() > 0).subscribe();
                });
            });
        });
        Optional.ofNullable(channelRegistry.get(clientIdentifier)).ifPresent(mqttChannel3 -> {
            doSession(mqttChannel3, mqttChannel, channelRegistry, topicRegistry, mqttReceiveContext.getMessageRegistry());
        });
        mqttChannel.registryClose(mqttChannel4 -> {
            close(mqttChannel, mqttReceiveContext);
        });
        channelRegistry.registry(clientIdentifier, mqttChannel);
        WindowMetric.WINDOW_METRIC_INSTANCE.recordConnect(1);
        mqttChannel.registryClose(ConnectProtocol::accept);
        recipientRegistry.channelStatus(mqttChannel, mqttChannel.getStatus());
        mqttChannel.registryClose(mqttChannel5 -> {
            recipientRegistry.channelStatus(mqttChannel5, ChannelStatus.OFFLINE);
        });
        return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_ACCEPTED), false);
    }

    private void close(MqttChannel mqttChannel, MqttReceiveContext mqttReceiveContext) {
        if (mqttChannel.isSessionPersistent()) {
            mqttChannel.setStatus(ChannelStatus.OFFLINE);
            mqttChannel.close().subscribe();
        } else {
            mqttReceiveContext.getTopicRegistry().clear(mqttChannel);
            mqttReceiveContext.getChannelRegistry().close(mqttChannel);
        }
    }

    private void doSession(MqttChannel mqttChannel, MqttChannel mqttChannel2, ChannelRegistry channelRegistry, TopicRegistry topicRegistry, MessageRegistry messageRegistry) {
        Set set = (Set) mqttChannel.getTopics().stream().peek(subscribeTopic -> {
            new SubscribeTopic(subscribeTopic.getTopicFilter(), subscribeTopic.getQoS(), subscribeTopic.getMqttChannel());
        }).collect(Collectors.toSet());
        topicRegistry.clear(mqttChannel);
        set.forEach(subscribeTopic2 -> {
            topicRegistry.registrySubscribesTopic(set);
        });
        channelRegistry.close(mqttChannel);
        Optional.ofNullable(messageRegistry.getSessionMessage(mqttChannel2.getClientIdentifier())).ifPresent(list -> {
            list.forEach(sessionMessage -> {
                mqttChannel2.write(sessionMessage.toPublishMessage(mqttChannel2), sessionMessage.getQos() > 0).subscribeOn(Schedulers.single()).subscribe();
            });
        });
    }

    public List<MqttMessageType> getMqttMessageTypes() {
        return MESSAGE_TYPE_LIST;
    }

    static {
        MESSAGE_TYPE_LIST.add(MqttMessageType.CONNECT);
    }
}
