package org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler;

import com.alipay.sofa.jraft.error.RemotingException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.mqtt.common.facade.RetainedPersistManager;
import org.apache.rocketmq.mqtt.common.hook.HookResult;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.Subscription;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
import org.apache.rocketmq.mqtt.cs.protocol.mqtt.facotry.MqttMessageFactory;
import org.apache.rocketmq.mqtt.cs.session.Session;
import org.apache.rocketmq.mqtt.cs.session.infly.PushAction;
import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.class */
public class MqttSubscribeHandler implements MqttPacketHandler<MqttSubscribeMessage> {
    private static Logger logger = LoggerFactory.getLogger(MqttSubscribeHandler.class);

    @Resource
    private SessionLoop sessionLoop;

    @Resource
    private ChannelManager channelManager;

    @Resource
    private RetainedPersistManager retainedPersistManager;

    @Resource
    private PushAction pushAction;
    private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new ThreadFactoryImpl("check_subscribe_future"));

    @Override // org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler
    public boolean preHandler(ChannelHandlerContext channelHandlerContext, MqttSubscribeMessage mqttSubscribeMessage) {
        return true;
    }

    @Override // org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler
    public void doHandler(ChannelHandlerContext channelHandlerContext, MqttSubscribeMessage mqttSubscribeMessage, HookResult hookResult) {
        String clientId = ChannelInfo.getClientId(channelHandlerContext.channel());
        Channel channel = channelHandlerContext.channel();
        if (!hookResult.isSuccess()) {
            this.channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, hookResult.getRemark());
            return;
        }
        CompletableFuture completableFuture = new CompletableFuture();
        ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_SUBSCRIBE, completableFuture);
        this.scheduler.schedule(() -> {
            if (completableFuture.isDone()) {
                return;
            }
            completableFuture.complete(null);
        }, 1L, TimeUnit.SECONDS);
        try {
            List<MqttTopicSubscription> list = mqttSubscribeMessage.payload().topicSubscriptions();
            HashSet hashSet = new HashSet();
            if (list != null && !list.isEmpty()) {
                for (MqttTopicSubscription mqttTopicSubscription : list) {
                    Subscription subscription = new Subscription();
                    subscription.setQos(mqttTopicSubscription.qualityOfService().value());
                    subscription.setTopicFilter(TopicUtils.normalizeTopic(mqttTopicSubscription.topicName()));
                    hashSet.add(subscription);
                }
                this.sessionLoop.addSubscription(ChannelInfo.getId(channelHandlerContext.channel()), hashSet);
            }
            completableFuture.thenAccept(r9 -> {
                if (channel.isActive()) {
                    ChannelInfo.removeFuture(channel, ChannelInfo.FUTURE_SUBSCRIBE);
                    channel.writeAndFlush(getResponse(mqttSubscribeMessage));
                    if (hashSet.isEmpty()) {
                        return;
                    }
                    try {
                        sendRetainMessage(channelHandlerContext, hashSet);
                    } catch (InterruptedException | RemotingException | org.apache.rocketmq.remoting.exception.RemotingException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        } catch (Exception e) {
            logger.error("Subscribe:{}", clientId, e);
            this.channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "SubscribeException");
        }
    }

    private MqttSubAckMessage getResponse(MqttSubscribeMessage mqttSubscribeMessage) {
        List list = mqttSubscribeMessage.payload().topicSubscriptions();
        int[] iArr = new int[list.size()];
        int i = 0;
        Iterator it = list.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            iArr[i2] = ((MqttTopicSubscription) it.next()).qualityOfService().value();
        }
        return MqttMessageFactory.buildSubAckMessage(Integer.valueOf(mqttSubscribeMessage.variableHeader().messageId()), iArr);
    }

    private void sendRetainMessage(ChannelHandlerContext channelHandlerContext, Set<Subscription> set) throws InterruptedException, RemotingException, org.apache.rocketmq.remoting.exception.RemotingException {
        String clientId = ChannelInfo.getClientId(channelHandlerContext.channel());
        Session session = this.sessionLoop.getSession(ChannelInfo.getId(channelHandlerContext.channel()));
        HashSet<Subscription> hashSet = new HashSet();
        HashSet<Subscription> hashSet2 = new HashSet();
        for (Subscription subscription : set) {
            if (TopicUtils.isWildCard(subscription.getTopicFilter())) {
                hashSet2.add(subscription);
            } else {
                hashSet.add(subscription);
            }
        }
        for (Subscription subscription2 : hashSet) {
            this.retainedPersistManager.getRetainedMessage(subscription2.getTopicFilter()).whenComplete((message, th) -> {
                if (message == null) {
                    return;
                }
                this.pushAction._sendMessage(session, clientId, subscription2, message);
            });
        }
        for (Subscription subscription3 : hashSet2) {
            this.retainedPersistManager.getMsgsFromTrie(subscription3).whenComplete((arrayList, th2) -> {
                Message message2;
                Iterator it = arrayList.iterator();
                while (it.hasNext() && (message2 = (Message) it.next()) != null) {
                    this.pushAction._sendMessage(session, clientId, subscription3, message2);
                }
            });
        }
    }
}
