package io.github.quickmsg.core.mqtt;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.transport.Transport;
import io.github.quickmsg.core.cluster.ClusterReceiver;
import io.github.quickmsg.core.cluster.ClusterSender;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/github/quickmsg/core/mqtt/MqttReceiveContext.class */
public class MqttReceiveContext extends AbstractReceiveContext<MqttConfiguration> {
    private static final Logger log = LoggerFactory.getLogger(MqttReceiveContext.class);
    private final ClusterSender clusterSender;
    private final ClusterReceiver clusterReceiver;

    public MqttReceiveContext(MqttConfiguration mqttConfiguration, Transport<MqttConfiguration> transport) {
        super(mqttConfiguration, transport);
        this.clusterSender = new ClusterSender(Schedulers.newParallel("cluster-transport"), this, getRecipientRegistry());
        this.clusterReceiver = new ClusterReceiver(this);
        this.clusterReceiver.registry();
    }

    public void apply(MqttChannel mqttChannel) {
        mqttChannel.registryDelayTcpClose().getConnection().inbound().receiveObject().cast(MqttMessage.class).map(mqttMessage -> {
            return this.clusterSender.apply(mqttChannel, mqttMessage);
        }).subscribe(mqttMessage2 -> {
            accept(mqttChannel, mqttMessage2);
        });
    }

    public void accept(MqttChannel mqttChannel, MqttMessage mqttMessage) {
        log.info("accept channel] {} message {}", mqttChannel.getConnection(), mqttMessage);
        getProtocolAdaptor().chooseProtocol(mqttChannel, mqttMessage, this);
    }

    public ClusterSender getClusterSender() {
        return this.clusterSender;
    }

    public ClusterReceiver getClusterReceiver() {
        return this.clusterReceiver;
    }
}
