package io.github.quickmsg.core.cluster;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.message.RecipientRegistry;
import io.github.quickmsg.core.mqtt.MqttReceiveContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import java.util.function.BiFunction;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/github/quickmsg/core/cluster/ClusterSender.class */
public class ClusterSender implements BiFunction<MqttChannel, MqttMessage, MqttMessage> {
    private final MqttReceiveContext mqttReceiveContext;
    private final Scheduler scheduler;
    private final RecipientRegistry recipientRegistry;

    public ClusterSender(Scheduler scheduler, MqttReceiveContext mqttReceiveContext, RecipientRegistry recipientRegistry) {
        this.scheduler = scheduler;
        this.mqttReceiveContext = mqttReceiveContext;
        this.recipientRegistry = recipientRegistry;
    }

    @Override // java.util.function.BiFunction
    public MqttMessage apply(MqttChannel mqttChannel, MqttMessage mqttMessage) {
        if (mqttMessage instanceof MqttPublishMessage) {
            MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
            mqttPublishMessage.payload().resetReaderIndex();
            this.recipientRegistry.accept(mqttChannel, mqttPublishMessage);
            mqttPublishMessage.retain();
            if (this.mqttReceiveContext.getConfiguration().getClusterConfig().getClustered().booleanValue()) {
                this.mqttReceiveContext.getClusterRegistry().spreadPublishMessage(mqttPublishMessage.copy()).subscribeOn(this.scheduler).subscribe();
            }
        }
        return mqttMessage;
    }
}
