package com.github.netty.protocol;

import com.github.netty.core.AbstractNettyServer;
import com.github.netty.core.AbstractProtocol;
import com.github.netty.core.util.LoggerFactoryX;
import com.github.netty.core.util.LoggerX;
import com.github.netty.protocol.mqtt.MemoryQueueRepository;
import com.github.netty.protocol.mqtt.MemoryRetainedRepository;
import com.github.netty.protocol.mqtt.MemorySubscriptionsRepository;
import com.github.netty.protocol.mqtt.MqttAutoFlushChannelHandler;
import com.github.netty.protocol.mqtt.MqttIdleTimeoutChannelHandler;
import com.github.netty.protocol.mqtt.MqttLoggerChannelHandler;
import com.github.netty.protocol.mqtt.MqttPostOffice;
import com.github.netty.protocol.mqtt.MqttServerChannelHandler;
import com.github.netty.protocol.mqtt.MqttSessionRegistry;
import com.github.netty.protocol.mqtt.config.BrokerConfiguration;
import com.github.netty.protocol.mqtt.config.BrokerConstants;
import com.github.netty.protocol.mqtt.config.FileResourceLoader;
import com.github.netty.protocol.mqtt.interception.BrokerInterceptor;
import com.github.netty.protocol.mqtt.interception.InterceptHandler;
import com.github.netty.protocol.mqtt.security.ACLFileParser;
import com.github.netty.protocol.mqtt.security.AcceptAllAuthenticator;
import com.github.netty.protocol.mqtt.security.DenyAllAuthorizatorPolicy;
import com.github.netty.protocol.mqtt.security.IAuthorizatorPolicy;
import com.github.netty.protocol.mqtt.security.PermitAllAuthorizatorPolicy;
import com.github.netty.protocol.mqtt.subscriptions.CTrieSubscriptionDirectory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.timeout.IdleStateHandler;
import java.text.ParseException;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/github/netty/protocol/MqttProtocol.class */
public class MqttProtocol extends AbstractProtocol {
    private LoggerX logger;
    private int messageMaxLength;
    private int nettyReaderIdleTimeSeconds;
    private int autoFlushIdleTime;
    private boolean enableMetrics;
    private String metricsLibratoEmail;
    private String metricsLibratoToken;
    private String metricsLibratoSource;
    private MqttIdleTimeoutChannelHandler timeoutHandler;
    private MqttLoggerChannelHandler mqttMessageLoggerChannelHandler;
    private BrokerInterceptor interceptor;
    private MqttServerChannelHandler mqttServerChannelHandler;
    private MqttPostOffice mqttPostOffice;

    public MqttProtocol() {
        this(BrokerConstants.DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE, 10, 0);
    }

    public MqttProtocol(int i, int i2, int i3) {
        this.logger = LoggerFactoryX.getLogger(MqttProtocol.class);
        this.enableMetrics = false;
        this.timeoutHandler = new MqttIdleTimeoutChannelHandler();
        this.mqttMessageLoggerChannelHandler = new MqttLoggerChannelHandler();
        this.interceptor = new BrokerInterceptor(1);
        this.messageMaxLength = i;
        this.nettyReaderIdleTimeSeconds = i2;
        this.autoFlushIdleTime = i3;
    }

    @Override // com.github.netty.core.ProtocolHandler
    public String getProtocolName() {
        return "mqtt";
    }

    @Override // com.github.netty.core.ProtocolHandler
    public boolean canSupport(ByteBuf byteBuf) {
        return byteBuf.readableBytes() >= 9 && byteBuf.getByte(4) == 77 && byteBuf.getByte(5) == 81 && byteBuf.getByte(6) == 84 && byteBuf.getByte(7) == 84;
    }

    @Override // com.github.netty.core.ProtocolHandler
    public void addPipeline(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addFirst("idleStateHandler", new IdleStateHandler(this.nettyReaderIdleTimeSeconds, 0, 0));
        pipeline.addAfter("idleStateHandler", "idleEventHandler", this.timeoutHandler);
        if (this.autoFlushIdleTime > 0) {
            pipeline.addLast("autoflush", new MqttAutoFlushChannelHandler(this.autoFlushIdleTime, TimeUnit.SECONDS));
        }
        pipeline.addLast("decoder", new MqttDecoder(this.messageMaxLength));
        pipeline.addLast("encoder", MqttEncoder.INSTANCE);
        pipeline.addLast("messageLogger", this.mqttMessageLoggerChannelHandler);
        pipeline.addLast("handler", this.mqttServerChannelHandler);
    }

    @Override // com.github.netty.core.AbstractProtocol, com.github.netty.core.ProtocolHandler, com.github.netty.core.Ordered, com.github.netty.core.ServerListener
    public int getOrder() {
        return 300;
    }

    @Override // com.github.netty.core.ServerListener
    public <T extends AbstractNettyServer> void onServerStart(T t) throws Exception {
        IAuthorizatorPolicy initializeAuthorizatorPolicy = initializeAuthorizatorPolicy();
        CTrieSubscriptionDirectory cTrieSubscriptionDirectory = new CTrieSubscriptionDirectory(new MemorySubscriptionsRepository());
        MqttSessionRegistry mqttSessionRegistry = new MqttSessionRegistry(cTrieSubscriptionDirectory, new MemoryQueueRepository());
        this.mqttPostOffice = new MqttPostOffice(cTrieSubscriptionDirectory, initializeAuthorizatorPolicy, new MemoryRetainedRepository(), mqttSessionRegistry, this.interceptor);
        this.mqttServerChannelHandler = new MqttServerChannelHandler(new BrokerConfiguration(), new AcceptAllAuthenticator(), mqttSessionRegistry, this.mqttPostOffice);
    }

    @Override // com.github.netty.core.ServerListener
    public <T extends AbstractNettyServer> void onServerStop(T t) throws Exception {
        if (this.interceptor != null) {
            this.interceptor.stop();
        }
    }

    protected IAuthorizatorPolicy initializeAuthorizatorPolicy() {
        IAuthorizatorPolicy permitAllAuthorizatorPolicy;
        String str = null;
        if (0 == 0 || str.isEmpty()) {
            permitAllAuthorizatorPolicy = new PermitAllAuthorizatorPolicy();
        } else {
            permitAllAuthorizatorPolicy = new DenyAllAuthorizatorPolicy();
            try {
                permitAllAuthorizatorPolicy = ACLFileParser.parse(new FileResourceLoader().loadResource((String) null));
            } catch (ParseException e) {
                this.logger.error("Unable to parse ACL file. path=" + ((String) null), (Throwable) e);
            }
        }
        return permitAllAuthorizatorPolicy;
    }

    public void internalPublish(MqttPublishMessage mqttPublishMessage, String str) {
        this.logger.trace("Internal publishing message CId: {}, messageId: {}", str, Integer.valueOf(mqttPublishMessage.variableHeader().packetId()));
        this.mqttPostOffice.internalPublish(mqttPublishMessage);
    }

    public void addInterceptHandler(InterceptHandler interceptHandler) {
        this.logger.info("Adding MQTT message interceptor. InterceptorId={}", interceptHandler.getID());
        this.interceptor.addInterceptHandler(interceptHandler);
    }

    public void removeInterceptHandler(InterceptHandler interceptHandler) {
        this.logger.info("Removing MQTT message interceptor. InterceptorId={}", interceptHandler.getID());
        this.interceptor.removeInterceptHandler(interceptHandler);
    }

    public boolean isEnableMetrics() {
        return this.enableMetrics;
    }

    public void setEnableMetrics(boolean z) {
        this.enableMetrics = z;
    }

    public String getMetricsLibratoEmail() {
        return this.metricsLibratoEmail;
    }

    public void setMetricsLibratoEmail(String str) {
        this.metricsLibratoEmail = str;
    }

    public String getMetricsLibratoToken() {
        return this.metricsLibratoToken;
    }

    public void setMetricsLibratoToken(String str) {
        this.metricsLibratoToken = str;
    }

    public String getMetricsLibratoSource() {
        return this.metricsLibratoSource;
    }

    public void setMetricsLibratoSource(String str) {
        this.metricsLibratoSource = str;
    }

    public int getAutoFlushIdleTime() {
        return this.autoFlushIdleTime;
    }

    public void setAutoFlushIdleTime(int i) {
        this.autoFlushIdleTime = i;
    }
}
