package io.github.quickmsg.core.mqtt;

import io.github.quickmsg.common.auth.PasswordAuthentication;
import io.github.quickmsg.common.channel.ChannelRegistry;
import io.github.quickmsg.common.channel.MockMqttChannel;
import io.github.quickmsg.common.cluster.ClusterConfig;
import io.github.quickmsg.common.cluster.ClusterMessage;
import io.github.quickmsg.common.cluster.ClusterRegistry;
import io.github.quickmsg.common.config.AbstractConfiguration;
import io.github.quickmsg.common.config.Configuration;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.message.MessageRegistry;
import io.github.quickmsg.common.message.MqttMessageBuilder;
import io.github.quickmsg.common.protocol.ProtocolAdaptor;
import io.github.quickmsg.common.topic.TopicRegistry;
import io.github.quickmsg.common.transport.Transport;
import io.github.quickmsg.core.DefaultChannelRegistry;
import io.github.quickmsg.core.DefaultMessageRegistry;
import io.github.quickmsg.core.DefaultProtocolAdaptor;
import io.github.quickmsg.core.DefaultTopicRegistry;
import io.github.quickmsg.core.cluster.InJvmClusterRegistry;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.time.Duration;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.netty.resources.LoopResources;

/* loaded from: input_file:io/github/quickmsg/core/mqtt/AbstractReceiveContext.class */
public abstract class AbstractReceiveContext<T extends Configuration> implements ReceiveContext<T> {
    private static final Logger log = LoggerFactory.getLogger(AbstractReceiveContext.class);
    private T configuration;
    private LoopResources loopResources;
    private Transport<T> transport;
    private final ProtocolAdaptor protocolAdaptor = protocolAdaptor();
    private final ChannelRegistry channelRegistry = channelRegistry();
    private final TopicRegistry topicRegistry = topicRegistry();
    private final MessageRegistry messageRegistry = messageRegistry();
    private final PasswordAuthentication passwordAuthentication = basicAuthentication();
    private final ClusterRegistry clusterRegistry;

    public AbstractReceiveContext(T t, Transport<T> transport) {
        this.configuration = t;
        this.transport = transport;
        this.loopResources = LoopResources.create("smqtt-cluster-io", t.getBossThreadSize().intValue(), t.getWorkThreadSize().intValue(), true);
        this.clusterRegistry = clusterRegistry(t.getClusterConfig());
    }

    private MessageRegistry messageRegistry() {
        return (MessageRegistry) Optional.ofNullable(MessageRegistry.INSTANCE).orElse(new DefaultMessageRegistry());
    }

    private PasswordAuthentication basicAuthentication() {
        return (PasswordAuthentication) Optional.ofNullable(PasswordAuthentication.INSTANCE).orElse(castConfiguration(this.configuration).getReactivePasswordAuth());
    }

    private ChannelRegistry channelRegistry() {
        return (ChannelRegistry) Optional.ofNullable(ChannelRegistry.INSTANCE).orElse(new DefaultChannelRegistry());
    }

    private TopicRegistry topicRegistry() {
        return (TopicRegistry) Optional.ofNullable(TopicRegistry.INSTANCE).orElse(new DefaultTopicRegistry());
    }

    private ProtocolAdaptor protocolAdaptor() {
        return ((ProtocolAdaptor) Optional.ofNullable(ProtocolAdaptor.INSTANCE).orElse(new DefaultProtocolAdaptor())).proxy();
    }

    private ClusterRegistry clusterRegistry(ClusterConfig clusterConfig) {
        ClusterRegistry clusterRegistry = (ClusterRegistry) Optional.ofNullable(ClusterRegistry.INSTANCE).orElse(new InJvmClusterRegistry());
        if (clusterConfig.getClustered().booleanValue()) {
            if (clusterRegistry instanceof InJvmClusterRegistry) {
                Flux.interval(Duration.ofSeconds(2L)).subscribe(l -> {
                    log.warn("please set  smqtt-registry dependency  ");
                });
            }
            clusterRegistry.registry(clusterConfig);
            clusterRegistry.handlerClusterMessage().subscribe(clusterMessage -> {
                this.protocolAdaptor.chooseProtocol(MockMqttChannel.DEFAULT_MOCK_CHANNEL, getMqttMessage(clusterMessage), this);
            });
        }
        return clusterRegistry;
    }

    private MqttPublishMessage getMqttMessage(ClusterMessage clusterMessage) {
        return MqttMessageBuilder.buildPub(false, MqttQoS.valueOf(clusterMessage.getQos()), 0, clusterMessage.getTopic(), PooledByteBufAllocator.DEFAULT.buffer().writeBytes(clusterMessage.getMessage()));
    }

    private AbstractConfiguration castConfiguration(T t) {
        return (AbstractConfiguration) t;
    }

    public T getConfiguration() {
        return this.configuration;
    }

    public LoopResources getLoopResources() {
        return this.loopResources;
    }

    public Transport<T> getTransport() {
        return this.transport;
    }

    public ProtocolAdaptor getProtocolAdaptor() {
        return this.protocolAdaptor;
    }

    public ChannelRegistry getChannelRegistry() {
        return this.channelRegistry;
    }

    public TopicRegistry getTopicRegistry() {
        return this.topicRegistry;
    }

    public MessageRegistry getMessageRegistry() {
        return this.messageRegistry;
    }

    public PasswordAuthentication getPasswordAuthentication() {
        return this.passwordAuthentication;
    }

    public ClusterRegistry getClusterRegistry() {
        return this.clusterRegistry;
    }

    public void setConfiguration(T t) {
        this.configuration = t;
    }

    public void setLoopResources(LoopResources loopResources) {
        this.loopResources = loopResources;
    }

    public void setTransport(Transport<T> transport) {
        this.transport = transport;
    }
}
