package io.github.quickmsg.core.protocol;

import io.github.quickmsg.common.auth.AuthManager;
import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.integrate.Integrate;
import io.github.quickmsg.common.integrate.channel.IntegrateChannels;
import io.github.quickmsg.common.integrate.topic.IntegrateTopics;
import io.github.quickmsg.common.log.LogEvent;
import io.github.quickmsg.common.log.LogManager;
import io.github.quickmsg.common.log.LogStatus;
import io.github.quickmsg.common.message.mqtt.ConnectMessage;
import io.github.quickmsg.common.metric.CounterType;
import io.github.quickmsg.common.protocol.Protocol;
import io.github.quickmsg.common.utils.JacksonUtil;
import io.github.quickmsg.common.utils.MqttMessageUtils;
import io.github.quickmsg.core.mqtt.MqttReceiveContext;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttVersion;
import java.util.ArrayList;
import java.util.Date;
import java.util.Optional;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.util.context.ContextView;

/* loaded from: input_file:io/github/quickmsg/core/protocol/ConnectProtocol.class */
public class ConnectProtocol implements Protocol<ConnectMessage> {
    private static final Logger log = LoggerFactory.getLogger(ConnectProtocol.class);
    private static final int MILLI_SECOND_PERIOD = 1000;

    public void parseProtocol(ConnectMessage connectMessage, MqttChannel mqttChannel, ContextView contextView) {
        ReceiveContext receiveContext = (ReceiveContext) contextView.get(ReceiveContext.class);
        LogManager logManager = receiveContext.getLogManager();
        MqttReceiveContext mqttReceiveContext = (MqttReceiveContext) contextView.get(ReceiveContext.class);
        String clientId = mqttChannel.getClientId();
        Integrate integrate = mqttReceiveContext.getIntegrate();
        IntegrateChannels channels = integrate.getChannels();
        integrate.getTopics();
        AuthManager authManager = mqttReceiveContext.getAuthManager();
        if (MqttVersion.MQTT_3_1_1 == connectMessage.getVersion() || MqttVersion.MQTT_3_1 == connectMessage.getVersion() || MqttVersion.MQTT_5 == connectMessage.getVersion()) {
            authManager.auth((String) Optional.ofNullable(connectMessage.getAuth()).map((v0) -> {
                return v0.getUsername();
            }).orElse(null), (byte[]) Optional.ofNullable(connectMessage.getAuth()).map((v0) -> {
                return v0.getPassword();
            }).orElseGet(() -> {
                return new byte[0];
            }), clientId).subscribe(bool -> {
                if (!bool.booleanValue()) {
                    logManager.printInfo(mqttChannel, LogEvent.CONNECT, LogStatus.FAILED, JacksonUtil.bean2Json(connectMessage.getCache(receiveContext.getIntegrate().getCluster().getLocalNode())));
                    mqttChannel.write(MqttMessageUtils.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
                    return;
                }
                mqttChannel.setConnectCache(connectMessage.getCache(receiveContext.getIntegrate().getCluster().getLocalNode()));
                logManager.printInfo(mqttChannel, LogEvent.CONNECT, LogStatus.SUCCESS, JacksonUtil.bean2Json(connectMessage.getCache(receiveContext.getIntegrate().getCluster().getLocalNode())));
                mqttChannel.setAuthTime(DateFormatUtils.format(new Date(), "yyyy-mm-dd hh:mm:ss"));
                mqttChannel.getConnection().onReadIdle((connectMessage.getKeepalive() * 1000) << 1, () -> {
                    logHeartClose(logManager, mqttChannel);
                });
                channels.add(mqttChannel.getClientId(), mqttChannel);
                mqttChannel.registryClose(mqttChannel2 -> {
                    close(mqttChannel, mqttReceiveContext);
                });
                mqttChannel.write(MqttMessageUtils.buildConnectAck(MqttConnectReturnCode.CONNECTION_ACCEPTED));
                receiveContext.getMetricManager().getMetricRegistry().getMetricCounter(CounterType.CONNECT).increment();
                receiveContext.getMetricManager().getMetricRegistry().getMetricCounter(CounterType.CONNECT_EVENT).increment();
            });
        } else {
            mqttChannel.write(MqttMessageUtils.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION));
        }
    }

    private void logHeartClose(LogManager logManager, MqttChannel mqttChannel) {
        logManager.printInfo(mqttChannel, LogEvent.HEART_TIMEOUT, LogStatus.SUCCESS, JacksonUtil.bean2Json(mqttChannel.getConnectCache()));
    }

    public Class<ConnectMessage> getClassType() {
        return ConnectMessage.class;
    }

    private void close(MqttChannel mqttChannel, MqttReceiveContext mqttReceiveContext) {
        mqttReceiveContext.getIntegrate().getChannels().remove(mqttChannel);
        IntegrateTopics topics = mqttReceiveContext.getIntegrate().getTopics();
        topics.removeTopic(mqttChannel, new ArrayList(mqttChannel.getTopics()));
        mqttReceiveContext.getRetryManager().clearRetry(mqttChannel);
        Optional.ofNullable(mqttChannel.getConnectCache().getWill()).ifPresent(will -> {
            Optional.ofNullable(topics.getMqttChannelsByTopic(will.getWillTopic())).ifPresent(set -> {
                set.forEach(subscribeTopic -> {
                    subscribeTopic.getMqttChannel().sendPublish(subscribeTopic.minQos(will.getMqttQoS()), will.toPublishMessage());
                });
            });
        });
    }
}
