package io.github.quickmsg.core.mqtt;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.message.SmqttMessage;
import io.github.quickmsg.common.transport.Transport;
import io.github.quickmsg.core.cluster.ClusterReceiver;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/quickmsg/core/mqtt/MqttReceiveContext.class */
public class MqttReceiveContext extends AbstractReceiveContext<MqttConfiguration> {
    private static final Logger log = LoggerFactory.getLogger(MqttReceiveContext.class);
    private final ClusterReceiver clusterReceiver;

    public MqttReceiveContext(MqttConfiguration mqttConfiguration, Transport<MqttConfiguration> transport) {
        super(mqttConfiguration, transport);
        this.clusterReceiver = new ClusterReceiver(this);
        this.clusterReceiver.registry();
    }

    public void apply(MqttChannel mqttChannel) {
        mqttChannel.registryDelayTcpClose().getConnection().inbound().receiveObject().cast(MqttMessage.class).onErrorContinue((th, obj) -> {
            log.error("on message error {}", obj, th);
        }).subscribe(mqttMessage -> {
            accept(mqttChannel, new SmqttMessage<>(mqttMessage, System.currentTimeMillis(), Boolean.FALSE));
        });
    }

    public void accept(MqttChannel mqttChannel, SmqttMessage<MqttMessage> smqttMessage) {
        getProtocolAdaptor().chooseProtocol(mqttChannel, smqttMessage, this);
    }

    public ClusterReceiver getClusterReceiver() {
        return this.clusterReceiver;
    }
}
