package io.github.quickmsg.core;

import io.github.quickmsg.common.context.ContextHolder;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.message.Message;
import io.github.quickmsg.common.protocol.Protocol;
import io.github.quickmsg.common.protocol.ProtocolAdaptor;
import io.github.quickmsg.common.spi.loader.DynamicLoader;
import io.github.quickmsg.common.utils.RetryFailureHandler;
import io.github.quickmsg.core.mqtt.AbstractReceiveContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/github/quickmsg/core/DefaultProtocolAdaptor.class */
public class DefaultProtocolAdaptor implements ProtocolAdaptor {
    private static final Logger log = LoggerFactory.getLogger(DefaultProtocolAdaptor.class);
    private final Sinks.Many<Message> acceptor;

    public DefaultProtocolAdaptor(Integer num, Integer num2) {
        this.acceptor = Sinks.many().multicast().onBackpressureBuffer(num.intValue());
        DynamicLoader.findAll(Protocol.class).forEach(protocol -> {
            this.acceptor.asFlux().doOnError(th -> {
                log.error("DefaultProtocolAdaptor consumer", th);
            }).onErrorResume(th2 -> {
                return Mono.empty();
            }).ofType(protocol.getClassType()).publishOn(Schedulers.newParallel("message-acceptor", num2.intValue())).subscribe(obj -> {
                Message message = (Message) obj;
                ReceiveContext receiveContext = ContextHolder.getReceiveContext();
                protocol.doParseProtocol(message, message.getMqttChannel()).contextWrite(context -> {
                    return context.putNonNull(ReceiveContext.class, ContextHolder.getReceiveContext());
                }).onErrorContinue((th3, obj) -> {
                    log.error("DefaultProtocolAdaptor", th3);
                }).subscribe();
                ((AbstractReceiveContext) receiveContext).getRuleDslExecutor().executeRule(message);
            });
        });
    }

    public void chooseProtocol(Message message) {
        try {
            this.acceptor.emitNext(message, RetryFailureHandler.RETRY_NON_SERIALIZED);
        } catch (Exception e) {
            log.error("protocol emitNext error", e);
        }
    }
}
