package io.github.quickmsg.core.protocol;

import io.github.quickmsg.common.acl.AclAction;
import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.integrate.SubscribeTopic;
import io.github.quickmsg.common.integrate.cluster.IntegrateCluster;
import io.github.quickmsg.common.integrate.msg.IntegrateMessages;
import io.github.quickmsg.common.integrate.topic.IntegrateTopics;
import io.github.quickmsg.common.message.RetainMessage;
import io.github.quickmsg.common.message.mqtt.ClusterMessage;
import io.github.quickmsg.common.message.mqtt.PublishMessage;
import io.github.quickmsg.common.protocol.Protocol;
import io.github.quickmsg.common.utils.JacksonUtil;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;

/* loaded from: input_file:io/github/quickmsg/core/protocol/PublishProtocol.class */
public class PublishProtocol implements Protocol<PublishMessage> {
    private static final Logger log = LoggerFactory.getLogger(PublishProtocol.class);

    public void parseProtocol(PublishMessage publishMessage, MqttChannel mqttChannel, ContextView contextView) {
        ReceiveContext receiveContext = (ReceiveContext) contextView.get(ReceiveContext.class);
        log.error("publish:" + JacksonUtil.bean2Json(publishMessage));
        IntegrateTopics topics = receiveContext.getIntegrate().getTopics();
        IntegrateCluster cluster = receiveContext.getIntegrate().getCluster();
        IntegrateMessages messages = receiveContext.getIntegrate().getMessages();
        if (!receiveContext.getAclManager().check(mqttChannel, publishMessage.getTopic(), AclAction.PUBLISH)) {
            log.warn("mqtt【{}】publish topic 【{}】 acl not authorized ", mqttChannel.getConnectCache(), publishMessage.getTopic());
            return;
        }
        if (publishMessage.isRetain()) {
            messages.saveRetainMessage(RetainMessage.of(publishMessage));
        }
        ClusterMessage clusterMessage = new ClusterMessage(publishMessage);
        cluster.sendCluster(clusterMessage.getTopic(), clusterMessage);
        Set wildcardTopics = topics.getWildcardTopics(clusterMessage.getTopic());
        if (wildcardTopics == null || wildcardTopics.size() <= 0) {
            return;
        }
        wildcardTopics.forEach(str -> {
            clusterMessage.setTopic(str);
            cluster.sendCluster(str, clusterMessage);
        });
    }

    public Class<PublishMessage> getClassType() {
        return PublishMessage.class;
    }

    private void send(Set<SubscribeTopic> set, PublishMessage publishMessage, Mono<Void> mono) {
        set.forEach(subscribeTopic -> {
            subscribeTopic.getMqttChannel().sendPublish(subscribeTopic.minQos(MqttQoS.valueOf(publishMessage.getQos())), publishMessage);
        });
        mono.subscribe();
    }

    private Mono<Void> filterRetainMessage(PublishMessage publishMessage, IntegrateMessages integrateMessages) {
        return Mono.fromRunnable(() -> {
            if (publishMessage.isRetain()) {
                integrateMessages.saveRetainMessage(RetainMessage.of(publishMessage));
            }
        });
    }
}
