package io.github.quickmsg.core.protocol;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.integrate.cluster.IntegrateCluster;
import io.github.quickmsg.common.integrate.topic.IntegrateTopics;
import io.github.quickmsg.common.log.LogEvent;
import io.github.quickmsg.common.log.LogManager;
import io.github.quickmsg.common.log.LogStatus;
import io.github.quickmsg.common.message.mqtt.ClusterMessage;
import io.github.quickmsg.common.message.mqtt.PublishMessage;
import io.github.quickmsg.common.message.mqtt.PublishRelMessage;
import io.github.quickmsg.common.metric.CounterType;
import io.github.quickmsg.common.protocol.Protocol;
import io.github.quickmsg.common.utils.JacksonUtil;
import io.github.quickmsg.common.utils.MqttMessageUtils;
import java.util.Set;
import reactor.util.context.ContextView;

/* loaded from: input_file:io/github/quickmsg/core/protocol/PublishRelProtocol.class */
public class PublishRelProtocol implements Protocol<PublishRelMessage> {
    public void parseProtocol(PublishRelMessage publishRelMessage, MqttChannel mqttChannel, ContextView contextView) {
        Set wildcardTopics;
        ReceiveContext receiveContext = (ReceiveContext) contextView.get(ReceiveContext.class);
        LogManager logManager = receiveContext.getLogManager();
        logManager.printInfo(mqttChannel, LogEvent.PUBLISH_REL, LogStatus.SUCCESS, JacksonUtil.bean2Json(publishRelMessage));
        PublishMessage sendQos2Cache = mqttChannel.sendQos2Cache(Integer.valueOf(publishRelMessage.getMessageId()));
        mqttChannel.write(MqttMessageUtils.buildPublishComp(publishRelMessage.getMessageId()));
        if (sendQos2Cache != null) {
            receiveContext.getMetricManager().getMetricRegistry().getMetricCounter(CounterType.PUBLISH_EVENT).increment();
            logManager.printInfo(mqttChannel, LogEvent.PUBLISH, LogStatus.SUCCESS, JacksonUtil.bean2Json(publishRelMessage));
            ClusterMessage clusterMessage = new ClusterMessage(sendQos2Cache);
            IntegrateCluster cluster = receiveContext.getIntegrate().getCluster();
            cluster.sendCluster(clusterMessage.getTopic(), clusterMessage);
            IntegrateTopics topics = receiveContext.getIntegrate().getTopics();
            if (!topics.isWildcard(clusterMessage.getTopic()) || (wildcardTopics = topics.getWildcardTopics(clusterMessage.getTopic())) == null || wildcardTopics.size() <= 0) {
                return;
            }
            wildcardTopics.forEach(str -> {
                clusterMessage.setTopic(str);
                cluster.sendCluster(str, clusterMessage);
            });
        }
    }

    public Class<PublishRelMessage> getClassType() {
        return PublishRelMessage.class;
    }
}
