package io.github.quickmsg.core.cluster;

import io.github.quickmsg.core.mqtt.MqttReceiveContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import java.util.function.Function;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/github/quickmsg/core/cluster/ClusterSender.class */
public class ClusterSender implements Function<MqttMessage, MqttMessage> {
    private final MqttReceiveContext mqttReceiveContext;
    private final Scheduler scheduler;

    public ClusterSender(Scheduler scheduler, MqttReceiveContext mqttReceiveContext) {
        this.scheduler = scheduler;
        this.mqttReceiveContext = mqttReceiveContext;
    }

    @Override // java.util.function.Function
    public MqttMessage apply(MqttMessage mqttMessage) {
        if (mqttMessage instanceof MqttPublishMessage) {
            this.mqttReceiveContext.getClusterRegistry().spreadPublishMessage(((MqttPublishMessage) mqttMessage).copy()).subscribeOn(this.scheduler).subscribe();
            ((MqttPublishMessage) mqttMessage).retain();
        }
        return mqttMessage;
    }
}
