package io.github.quickmsg.core.cluster;

import io.github.quickmsg.common.channel.MockMqttChannel;
import io.github.quickmsg.common.cluster.ClusterConfig;
import io.github.quickmsg.common.cluster.ClusterMessage;
import io.github.quickmsg.common.cluster.ClusterRegistry;
import io.github.quickmsg.common.message.MqttMessageBuilder;
import io.github.quickmsg.common.protocol.ProtocolAdaptor;
import io.github.quickmsg.core.mqtt.MqttReceiveContext;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

/* loaded from: input_file:io/github/quickmsg/core/cluster/ClusterReceiver.class */
public class ClusterReceiver {
    private static final Logger log = LoggerFactory.getLogger(ClusterReceiver.class);
    private final MqttReceiveContext mqttReceiveContext;

    public ClusterReceiver(MqttReceiveContext mqttReceiveContext) {
        this.mqttReceiveContext = mqttReceiveContext;
    }

    public void registry() {
        ClusterConfig clusterConfig = this.mqttReceiveContext.getConfiguration().getClusterConfig();
        ClusterRegistry clusterRegistry = this.mqttReceiveContext.getClusterRegistry();
        ProtocolAdaptor protocolAdaptor = this.mqttReceiveContext.getProtocolAdaptor();
        if (clusterConfig.getClustered().booleanValue()) {
            if (clusterRegistry instanceof InJvmClusterRegistry) {
                Flux.interval(Duration.ofSeconds(2L)).subscribe(l -> {
                    log.warn("please set  smqtt-registry dependency  ");
                });
            } else {
                clusterRegistry.registry(clusterConfig);
                clusterRegistry.handlerClusterMessage().subscribe(clusterMessage -> {
                    protocolAdaptor.chooseProtocol(MockMqttChannel.DEFAULT_MOCK_CHANNEL, getMqttMessage(clusterMessage), this.mqttReceiveContext);
                });
            }
        }
    }

    private MqttPublishMessage getMqttMessage(ClusterMessage clusterMessage) {
        return MqttMessageBuilder.buildPub(false, MqttQoS.valueOf(clusterMessage.getQos()), 0, clusterMessage.getTopic(), PooledByteBufAllocator.DEFAULT.buffer().writeBytes(clusterMessage.getMessage()));
    }
}
