package io.github.quickmsg.common.interceptor;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.message.HeapMqttMessage;
import io.github.quickmsg.common.message.SmqttMessage;
import io.github.quickmsg.common.protocol.ProtocolAdaptor;
import io.github.quickmsg.common.rule.DslExecutor;
import io.github.quickmsg.common.spi.DynamicLoader;
import io.github.quickmsg.common.utils.JacksonUtil;
import io.github.quickmsg.common.utils.MessageUtils;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import reactor.core.scheduler.Schedulers;
import reactor.netty.ReactorNetty;

/* loaded from: input_file:io/github/quickmsg/common/interceptor/MessageProxy.class */
public class MessageProxy {
    private List<Interceptor> interceptors = (List) DynamicLoader.findAll(Interceptor.class).sorted(Comparator.comparing((v0) -> {
        return v0.sort();
    })).collect(Collectors.toList());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/github/quickmsg/common/interceptor/MessageProxy$HeadIntercept.class */
    public static class HeadIntercept implements Interceptor {
        HeadIntercept() {
        }

        @Override // io.github.quickmsg.common.interceptor.Interceptor
        public Object intercept(Invocation invocation) {
            SmqttMessage smqttMessage = (SmqttMessage) invocation.getArgs()[1];
            MqttPublishMessage message = smqttMessage.getMessage();
            try {
                if (message instanceof MqttPublishMessage) {
                    message.retain();
                }
                Object proceed = invocation.proceed();
                if (smqttMessage.getIsCluster().booleanValue()) {
                    ReactorNetty.safeRelease(message.payload());
                }
                return proceed;
            } catch (Throwable th) {
                if (smqttMessage.getIsCluster().booleanValue()) {
                    ReactorNetty.safeRelease(message.payload());
                }
                throw th;
            }
        }

        @Override // io.github.quickmsg.common.interceptor.Interceptor
        public int sort() {
            return 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/github/quickmsg/common/interceptor/MessageProxy$TailIntercept.class */
    public static class TailIntercept implements Interceptor {
        TailIntercept() {
        }

        @Override // io.github.quickmsg.common.interceptor.Interceptor
        public Object intercept(Invocation invocation) {
            MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0];
            SmqttMessage smqttMessage = (SmqttMessage) invocation.getArgs()[1];
            ReceiveContext receiveContext = (ReceiveContext) invocation.getArgs()[2];
            DslExecutor dslExecutor = receiveContext.getDslExecutor();
            MqttMessage message = smqttMessage.getMessage();
            if (!smqttMessage.getIsCluster().booleanValue() && (message instanceof MqttPublishMessage)) {
                HeapMqttMessage clusterMessage = clusterMessage((MqttPublishMessage) message, mqttChannel, smqttMessage.getTimestamp());
                if (receiveContext.getConfiguration().getClusterConfig().isEnable()) {
                    receiveContext.getClusterRegistry().spreadPublishMessage(clusterMessage).subscribeOn(Schedulers.boundedElastic()).subscribe();
                }
                if (dslExecutor.isExecute().booleanValue()) {
                    dslExecutor.executeRule(mqttChannel, clusterMessage, receiveContext);
                }
            }
            return invocation.proceed();
        }

        private HeapMqttMessage clusterMessage(MqttPublishMessage mqttPublishMessage, MqttChannel mqttChannel, long j) {
            MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader();
            MqttFixedHeader fixedHeader = mqttPublishMessage.fixedHeader();
            return HeapMqttMessage.builder().timestamp(j).clientIdentifier(mqttChannel.getClientIdentifier()).message(JacksonUtil.dynamic(new String(MessageUtils.copyReleaseByteBuf(mqttPublishMessage.payload()), StandardCharsets.UTF_8))).topic(variableHeader.topicName()).retain(fixedHeader.isRetain()).qos(fixedHeader.qosLevel().value()).properties(variableHeader.properties()).build();
        }

        @Override // io.github.quickmsg.common.interceptor.Interceptor
        public int sort() {
            return 0;
        }
    }

    public ProtocolAdaptor proxy(ProtocolAdaptor protocolAdaptor) {
        ProtocolAdaptor proxyProtocol = new TailIntercept().proxyProtocol(protocolAdaptor);
        Iterator<Interceptor> it = this.interceptors.iterator();
        while (it.hasNext()) {
            proxyProtocol = it.next().proxyProtocol(proxyProtocol);
        }
        return new HeadIntercept().proxyProtocol(proxyProtocol);
    }
}
