package io.github.quickmsg.core.cluster;

import io.github.quickmsg.common.channel.MockMqttChannel;
import io.github.quickmsg.common.cluster.ClusterRegistry;
import io.github.quickmsg.common.config.BootstrapConfig;
import io.github.quickmsg.common.message.HeapMqttMessage;
import io.github.quickmsg.common.message.MqttMessageBuilder;
import io.github.quickmsg.common.message.SmqttMessage;
import io.github.quickmsg.common.protocol.ProtocolAdaptor;
import io.github.quickmsg.common.utils.JacksonUtil;
import io.github.quickmsg.core.mqtt.MqttReceiveContext;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/github/quickmsg/core/cluster/ClusterReceiver.class */
public class ClusterReceiver {
    private static final Logger log = LoggerFactory.getLogger(ClusterReceiver.class);
    private final MqttReceiveContext mqttReceiveContext;

    public ClusterReceiver(MqttReceiveContext mqttReceiveContext) {
        this.mqttReceiveContext = mqttReceiveContext;
    }

    public void registry() {
        BootstrapConfig.ClusterConfig clusterConfig = this.mqttReceiveContext.getConfiguration().getClusterConfig();
        ClusterRegistry clusterRegistry = this.mqttReceiveContext.getClusterRegistry();
        ProtocolAdaptor protocolAdaptor = this.mqttReceiveContext.getProtocolAdaptor();
        if (clusterConfig.isEnable()) {
            if (clusterRegistry instanceof InJvmClusterRegistry) {
                Flux.interval(Duration.ofSeconds(2L)).subscribe(l -> {
                    log.warn("please set  smqtt-registry dependency  ");
                });
            } else {
                clusterRegistry.registry(clusterConfig);
                clusterRegistry.handlerClusterMessage().doOnError(th -> {
                    log.error("cluster accept", th);
                }).onErrorResume(th2 -> {
                    return Mono.empty();
                }).subscribe(heapMqttMessage -> {
                    protocolAdaptor.chooseProtocol(MockMqttChannel.wrapClientIdentifier(heapMqttMessage.getClientIdentifier()), getMqttMessage(heapMqttMessage), this.mqttReceiveContext);
                });
            }
        }
    }

    private SmqttMessage<MqttMessage> getMqttMessage(HeapMqttMessage heapMqttMessage) {
        return new SmqttMessage<>(MqttMessageBuilder.buildPub(false, MqttQoS.valueOf(heapMqttMessage.getQos()), 0, heapMqttMessage.getTopic(), PooledByteBufAllocator.DEFAULT.buffer().writeBytes(JacksonUtil.dynamicJson(heapMqttMessage.getMessage()).getBytes(StandardCharsets.UTF_8)), heapMqttMessage.getProperties()), System.currentTimeMillis(), Boolean.TRUE);
    }
}
