package io.github.quickmsg.core.spi;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.config.Configuration;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.message.SmqttMessage;
import io.github.quickmsg.common.protocol.Protocol;
import io.github.quickmsg.common.protocol.ProtocolAdaptor;
import io.github.quickmsg.common.spi.DynamicLoader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty.ReactorNetty;

/* loaded from: input_file:io/github/quickmsg/core/spi/DefaultProtocolAdaptor.class */
public class DefaultProtocolAdaptor implements ProtocolAdaptor {
    private static final Logger log = LoggerFactory.getLogger(DefaultProtocolAdaptor.class);
    private Map<MqttMessageType, Protocol<MqttMessage>> types = new HashMap();
    private final Scheduler scheduler;

    public DefaultProtocolAdaptor(Scheduler scheduler) {
        this.scheduler = (Scheduler) Optional.ofNullable(scheduler).orElse(Schedulers.boundedElastic());
        DynamicLoader.findAll(Protocol.class).forEach(protocol -> {
            protocol.getMqttMessageTypes().forEach(obj -> {
                this.types.put((MqttMessageType) obj, protocol);
            });
        });
    }

    public <C extends Configuration> void chooseProtocol(MqttChannel mqttChannel, SmqttMessage<MqttMessage> smqttMessage, ReceiveContext<C> receiveContext) {
        MqttMessage message = smqttMessage.getMessage();
        if (message == null || message.decoderResult() == null || !message.decoderResult().isSuccess()) {
            log.error("chooseProtocol {} error mqttMessage {} ", mqttChannel, message);
        } else {
            log.info(" 【{}】【{}】 【{}】", new Object[]{Thread.currentThread().getName(), message.fixedHeader().messageType(), mqttChannel});
            Optional.ofNullable(this.types.get(message.fixedHeader().messageType())).ifPresent(protocol -> {
                protocol.doParseProtocol(smqttMessage, mqttChannel).contextWrite(context -> {
                    return context.putNonNull(ReceiveContext.class, receiveContext);
                }).subscribeOn(this.scheduler).onErrorContinue((th, obj) -> {
                }).subscribe(r1 -> {
                }, th2 -> {
                    log.error("channel {} chooseProtocol: {} error {}", new Object[]{mqttChannel, message, th2.getMessage()});
                    ReactorNetty.safeRelease(message.payload());
                }, () -> {
                    ReactorNetty.safeRelease(message.payload());
                });
            });
        }
    }
}
