package io.github.quickmsg.core;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.config.Configuration;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.protocol.Protocol;
import io.github.quickmsg.common.protocol.ProtocolAdaptor;
import io.github.quickmsg.common.spi.DynamicLoader;
import io.github.quickmsg.common.utils.MessageUtils;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/github/quickmsg/core/DefaultProtocolAdaptor.class */
public class DefaultProtocolAdaptor implements ProtocolAdaptor {
    private static final Logger log = LoggerFactory.getLogger(DefaultProtocolAdaptor.class);
    private Map<MqttMessageType, Protocol<MqttMessage>> types = new HashMap();

    public DefaultProtocolAdaptor() {
        DynamicLoader.findAll(Protocol.class).forEach(protocol -> {
            protocol.getMqttMessageTypes().forEach(obj -> {
                this.types.put((MqttMessageType) obj, protocol);
            });
        });
    }

    public <C extends Configuration> void chooseProtocol(MqttChannel mqttChannel, MqttMessage mqttMessage, ReceiveContext<C> receiveContext) {
        Optional.ofNullable(this.types.get(mqttMessage.fixedHeader().messageType())).ifPresent(protocol -> {
            protocol.doParseProtocol(mqttMessage, mqttChannel).contextWrite(context -> {
                return context.putNonNull(ReceiveContext.class, receiveContext);
            }).subscribeOn(Schedulers.parallel()).subscribe(r1 -> {
            }, th -> {
                log.error("channel {} chooseProtocol:", mqttChannel, th);
                MessageUtils.safeRelease(mqttMessage);
            }, () -> {
                MessageUtils.safeRelease(mqttMessage);
            });
        });
    }
}
