package org.apache.rocketmq.mqtt.cs.session.loop;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.mqtt.common.facade.LmqOffsetStore;
import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
import org.apache.rocketmq.mqtt.common.facade.WillMsgPersistManager;
import org.apache.rocketmq.mqtt.common.meta.IpUtil;
import org.apache.rocketmq.mqtt.common.model.PullResult;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.QueueOffset;
import org.apache.rocketmq.mqtt.common.model.Subscription;
import org.apache.rocketmq.mqtt.common.model.WillMessage;
import org.apache.rocketmq.mqtt.common.util.SpringUtils;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
import org.apache.rocketmq.mqtt.cs.session.QueueFresh;
import org.apache.rocketmq.mqtt.cs.session.Session;
import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
import org.apache.rocketmq.mqtt.cs.session.infly.MqttMsgId;
import org.apache.rocketmq.mqtt.cs.session.infly.PushAction;
import org.apache.rocketmq.mqtt.cs.session.match.MatchAction;
import org.apache.rocketmq.mqtt.ds.upstream.processor.PublishProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.class */
public class SessionLoopImpl implements SessionLoop {
    private static Logger logger = LoggerFactory.getLogger(SessionLoopImpl.class);

    @Resource
    private PushAction pushAction;

    @Resource
    private MatchAction matchAction;

    @Resource
    private ConnectConf connectConf;

    @Resource
    private InFlyCache inFlyCache;

    @Resource
    private QueueCache queueCache;

    @Resource
    private LmqQueueStore lmqQueueStore;

    @Resource
    private LmqOffsetStore lmqOffsetStore;

    @Resource
    private QueueFresh queueFresh;

    @Resource
    private WillMsgPersistManager willMsgPersistManager;

    @Resource
    private MqttMsgId mqttMsgId;

    @Resource
    private PublishProcessor publishProcessor;
    private ChannelManager channelManager;
    private ScheduledThreadPoolExecutor pullService;
    private ScheduledThreadPoolExecutor scheduler;
    private ScheduledThreadPoolExecutor persistOffsetScheduler;
    private SubscriptionPersistManager subscriptionPersistManager;
    private Map<String, Session> sessionMap = new ConcurrentHashMap(1024);
    private Map<String, Map<String, Session>> clientMap = new ConcurrentHashMap(1024);
    private Map<String, PullEvent> pullEventMap = new ConcurrentHashMap(1024);
    private Map<String, Boolean> pullStatus = new ConcurrentHashMap(1024);
    private AtomicLong rid = new AtomicLong();
    private long pullIntervalMillis = 10;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl$PullEvent.class */
    public class PullEvent {
        private Session session;
        private Subscription subscription;
        private Queue queue;
        private long id;

        public PullEvent(Session session, Subscription subscription, Queue queue) {
            this.id = SessionLoopImpl.this.rid.getAndIncrement();
            this.session = session;
            this.subscription = subscription;
            this.queue = queue;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.id == ((PullEvent) obj).id;
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.id));
        }
    }

    @PostConstruct
    public void init() {
        this.pullService = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new ThreadFactoryImpl("pull_message_thread_"));
        this.scheduler = new ScheduledThreadPoolExecutor(2, (ThreadFactory) new ThreadFactoryImpl("loop_scheduler_"));
        this.persistOffsetScheduler = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new ThreadFactoryImpl("persistOffset_scheduler_"));
        this.persistOffsetScheduler.scheduleWithFixedDelay(() -> {
            persistAllOffset(true);
        }, 5000L, 5000L, TimeUnit.MILLISECONDS);
        this.pullService.scheduleWithFixedDelay(() -> {
            pullLoop();
        }, this.pullIntervalMillis, this.pullIntervalMillis, TimeUnit.MILLISECONDS);
    }

    private void pullLoop() {
        try {
            for (Map.Entry<String, PullEvent> entry : this.pullEventMap.entrySet()) {
                PullEvent value = entry.getValue();
                Session session = value.session;
                if (!session.getChannel().isActive()) {
                    this.pullStatus.remove(eventQueueKey(session, value.queue));
                    this.pullEventMap.remove(entry.getKey());
                } else if (!Boolean.TRUE.equals(this.pullStatus.get(eventQueueKey(session, value.queue))) && session.getChannel().isWritable()) {
                    doPull(value);
                }
            }
        } catch (Exception e) {
            logger.error("", e);
        }
    }

    @Override // org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop
    public void setChannelManager(ChannelManager channelManager) {
        this.channelManager = channelManager;
    }

    @Override // org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop
    public void loadSession(String str, Channel channel) {
        if (!StringUtils.isBlank(str) && channel.isActive()) {
            String id = ChannelInfo.getId(channel);
            if (this.sessionMap.containsKey(id)) {
                return;
            }
            Session session = new Session();
            session.setClientId(str);
            session.setChannelId(id);
            session.setChannel(channel);
            addSubscriptionAndInit(session, new HashSet(Arrays.asList(Subscription.newP2pSubscription(str), Subscription.newRetrySubscription(str))), ChannelInfo.getFuture(channel, ChannelInfo.FUTURE_CONNECT));
            synchronized (this) {
                this.sessionMap.put(id, session);
                if (!this.clientMap.containsKey(str)) {
                    this.clientMap.putIfAbsent(str, new ConcurrentHashMap(2));
                }
                this.clientMap.get(str).put(id, session);
            }
            if (!channel.isActive()) {
                unloadSession(str, id);
            } else {
                if (session.isClean()) {
                    return;
                }
                notifyPullMessage(session, null, null);
            }
        }
    }

    @Override // org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop
    public Session unloadSession(String str, String str2) {
        Session session = null;
        try {
            try {
                synchronized (this) {
                    session = this.sessionMap.remove(str2);
                    if (str == null && session != null) {
                        str = session.getClientId();
                    }
                    if (str != null && this.clientMap.containsKey(str)) {
                        this.clientMap.get(str).remove(str2);
                        if (this.clientMap.get(str).isEmpty()) {
                            this.clientMap.remove(str);
                        }
                    }
                }
                this.inFlyCache.cleanResource(str, str2);
                if (session != null) {
                    this.matchAction.removeSubscription(session, session.subscriptionSnapshot());
                    persistOffset(session);
                }
                if (session != null) {
                    session.destroy();
                }
            } catch (Exception e) {
                logger.error("unloadSession fail:{},{}", new Object[]{str, str2, e});
                if (0 != 0) {
                    session.destroy();
                }
            }
            return session;
        } catch (Throwable th) {
            if (0 != 0) {
                session.destroy();
            }
            throw th;
        }
    }

    @Override // org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop
    public Session getSession(String str) {
        return this.sessionMap.get(str);
    }

    @Override // org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop
    public List<Session> getSessionList(String str) {
        ArrayList arrayList = new ArrayList();
        Map<String, Session> map = this.clientMap.get(str);
        if (map != null && !map.isEmpty()) {
            for (Session session : map.values()) {
                if (session.isDestroyed()) {
                    logger.error("the session was destroyed,{}", str);
                    map.remove(session.getChannelId());
                } else {
                    arrayList.add(session);
                }
            }
        }
        return arrayList;
    }

    @Override // org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop
    public void addSubscription(String str, Set<Subscription> set) {
        Session session;
        if (set == null || set.isEmpty() || (session = getSession(str)) == null) {
            return;
        }
        addSubscriptionAndInit(session, set, ChannelInfo.getFuture(session.getChannel(), ChannelInfo.FUTURE_SUBSCRIBE));
        this.matchAction.addSubscription(session, set);
        if (session.isClean()) {
            return;
        }
        Iterator<Subscription> it = set.iterator();
        while (it.hasNext()) {
            notifyPullMessage(session, it.next(), null);
        }
    }

    @Override // org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop
    public void removeSubscription(String str, Set<Subscription> set) {
        Session session;
        if (set == null || set.isEmpty() || (session = getSession(str)) == null) {
            return;
        }
        Iterator<Subscription> it = set.iterator();
        while (it.hasNext()) {
            session.removeSubscription(it.next());
        }
        this.matchAction.removeSubscription(session, set);
    }

    private void addSubscriptionAndInit(Session session, Set<Subscription> set, CompletableFuture<Void> completableFuture) {
        if (session == null || set == null) {
            return;
        }
        session.addSubscription(set);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for (Subscription subscription : set) {
            this.queueFresh.freshQueue(session, subscription);
            Map<Queue, QueueOffset> queueOffset = session.getQueueOffset(subscription);
            if (queueOffset != null) {
                atomicInteger.addAndGet(queueOffset.size());
            }
        }
        for (Subscription subscription2 : set) {
            Map<Queue, QueueOffset> queueOffset2 = session.getQueueOffset(subscription2);
            if (queueOffset2 != null) {
                for (Map.Entry<Queue, QueueOffset> entry : queueOffset2.entrySet()) {
                    initOffset(session, subscription2, entry.getKey(), entry.getValue(), completableFuture, atomicInteger);
                }
            }
        }
    }

    private void futureDone(CompletableFuture<Void> completableFuture, AtomicInteger atomicInteger) {
        if (completableFuture == null || atomicInteger == null || atomicInteger.decrementAndGet() > 0) {
            return;
        }
        completableFuture.complete(null);
    }

    private void initOffset(Session session, Subscription subscription, Queue queue, QueueOffset queueOffset, CompletableFuture<Void> completableFuture, AtomicInteger atomicInteger) {
        if (queueOffset.isInitialized()) {
            futureDone(completableFuture, atomicInteger);
        } else {
            if (queueOffset.isInitializing()) {
                return;
            }
            queueOffset.setInitializing();
            this.lmqQueueStore.queryQueueMaxOffset(queue).whenComplete((l, th) -> {
                if (th == null) {
                    QueueOffset queueOffset2 = session.getQueueOffset(subscription, queue);
                    if (queueOffset2 != null) {
                        if (!queueOffset2.isInitialized()) {
                            queueOffset2.setOffset(l.longValue());
                        }
                        queueOffset2.setInitialized();
                    }
                    futureDone(completableFuture, atomicInteger);
                    return;
                }
                logger.error("queryQueueMaxId onException {}", queue.getQueueName(), th);
                QueueOffset queueOffset3 = session.getQueueOffset(subscription, queue);
                if (queueOffset3 != null) {
                    if (!queueOffset3.isInitialized()) {
                        queueOffset3.setOffset(Long.MAX_VALUE);
                    }
                    queueOffset3.setInitialized();
                }
                futureDone(completableFuture, atomicInteger);
            });
        }
    }

    @Override // org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop
    public void notifyPullMessage(Session session, Subscription subscription, Queue queue) {
        if (session == null || session.isDestroyed()) {
            return;
        }
        if (this.subscriptionPersistManager == null) {
            this.subscriptionPersistManager = (SubscriptionPersistManager) SpringUtils.getBean(SubscriptionPersistManager.class);
        }
        if (this.subscriptionPersistManager != null && !session.isClean() && !session.isLoaded()) {
            if (session.isLoading()) {
                return;
            }
            session.setLoading();
            this.subscriptionPersistManager.loadSubscriptions(session.getClientId()).whenComplete((set, th) -> {
                if (th != null) {
                    logger.error("", th);
                    this.scheduler.schedule(() -> {
                        session.resetLoad();
                        notifyPullMessage(session, subscription, queue);
                    }, 3L, TimeUnit.SECONDS);
                } else {
                    session.addSubscription((Set<Subscription>) set);
                    this.matchAction.addSubscription(session, set);
                    session.setLoaded();
                    notifyPullMessage(session, subscription, queue);
                }
            });
            return;
        }
        if (queue != null) {
            if (subscription == null) {
                throw new RuntimeException("invalid notifyPullMessage, subscription is null, but queue is not null," + session.getClientId());
            }
            this.queueFresh.freshQueue(session, subscription);
            pullMessage(session, subscription, queue);
            return;
        }
        for (Subscription subscription2 : session.subscriptionSnapshot()) {
            if (subscription == null || subscription2.equals(subscription)) {
                this.queueFresh.freshQueue(session, subscription2);
                Map<Queue, QueueOffset> queueOffset = session.getQueueOffset(subscription2);
                if (queueOffset != null) {
                    Iterator<Map.Entry<Queue, QueueOffset>> it = queueOffset.entrySet().iterator();
                    while (it.hasNext()) {
                        pullMessage(session, subscription2, it.next().getKey());
                    }
                }
            }
        }
    }

    @Override // org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop
    public void addWillMessage(Channel channel, WillMessage willMessage) {
        Session session = getSession(ChannelInfo.getId(channel));
        String clientId = ChannelInfo.getClientId(channel);
        String localAddressCompatible = IpUtil.getLocalAddressCompatible();
        if (session == null || willMessage == null) {
            return;
        }
        String jSONString = JSON.toJSONString(willMessage);
        String str = localAddressCompatible + 1 + clientId;
        this.willMsgPersistManager.put(str, jSONString).whenComplete((bool, th) -> {
            if (bool.booleanValue() && th == null) {
                logger.debug("put will message key {} value {} successfully", str, jSONString);
            } else {
                logger.error("fail to put will message key {} value {}", str, willMessage);
            }
        });
    }

    private String eventQueueKey(Session session, Queue queue) {
        return ChannelInfo.getId(session.getChannel()) + "-" + queue.getQueueId() + "-" + queue.getQueueName() + "-" + queue.getBrokerName();
    }

    private boolean needLoadPersistedOffset(Session session, Subscription subscription, Queue queue) {
        if (session.isClean()) {
            return false;
        }
        Integer num = session.getLoadStatusMap().get(subscription);
        if (num != null && num.intValue() == 1) {
            return false;
        }
        if (num != null && num.intValue() == 0) {
            return true;
        }
        session.getLoadStatusMap().put(subscription, 0);
        this.lmqOffsetStore.getOffset(session.getClientId(), subscription).whenComplete((map, th) -> {
            if (th != null) {
                this.scheduler.schedule(() -> {
                    session.getLoadStatusMap().put(subscription, -1);
                    pullMessage(session, subscription, queue);
                }, 3L, TimeUnit.SECONDS);
                return;
            }
            session.addOffset(subscription.toQueueName(), map);
            session.getLoadStatusMap().put(subscription, 1);
            pullMessage(session, subscription, queue);
        });
        return true;
    }

    private void pullMessage(Session session, Subscription subscription, Queue queue) {
        if (queue == null || session == null || session.isDestroyed() || needLoadPersistedOffset(session, subscription, queue)) {
            return;
        }
        if (!session.sendingMessageIsEmpty(subscription, queue)) {
            this.scheduler.schedule(() -> {
                pullMessage(session, subscription, queue);
            }, this.pullIntervalMillis, TimeUnit.MILLISECONDS);
        } else {
            this.pullEventMap.put(eventQueueKey(session, queue), new PullEvent(session, subscription, queue));
        }
    }

    private void doPull(PullEvent pullEvent) {
        Session session = pullEvent.session;
        Subscription subscription = pullEvent.subscription;
        Queue queue = pullEvent.queue;
        QueueOffset queueOffset = session.getQueueOffset(subscription, queue);
        if (session.isDestroyed() || queueOffset == null) {
            clearPullStatus(session, queue, pullEvent);
            return;
        }
        if (!queueOffset.isInitialized()) {
            initOffset(session, subscription, queue, queueOffset, null, null);
            this.scheduler.schedule(() -> {
                pullMessage(session, subscription, queue);
            }, this.pullIntervalMillis, TimeUnit.MILLISECONDS);
            return;
        }
        this.pullStatus.put(eventQueueKey(session, queue), true);
        int pullSize = session.getPullSize() > 0 ? session.getPullSize() : this.connectConf.getPullBatchSize();
        CompletableFuture<PullResult> completableFuture = new CompletableFuture<>();
        completableFuture.whenComplete((pullResult, th) -> {
            if (th != null) {
                clearPullStatus(session, queue, pullEvent);
                logger.error("{}", session.getClientId(), th);
                if (session.isDestroyed()) {
                    return;
                }
                this.scheduler.schedule(() -> {
                    pullMessage(session, subscription, queue);
                }, 1L, TimeUnit.SECONDS);
                return;
            }
            try {
                if (session.isDestroyed()) {
                    return;
                }
                if (301 == pullResult.getCode()) {
                    if (pullResult.getMessageList() != null && pullResult.getMessageList().size() >= pullSize) {
                        this.scheduler.schedule(() -> {
                            pullMessage(session, subscription, queue);
                        }, this.pullIntervalMillis, TimeUnit.MILLISECONDS);
                    }
                    if (session.addSendingMessages(subscription, queue, pullResult.getMessageList())) {
                        this.pushAction.messageArrive(session, subscription, queue);
                    }
                } else if (302 == pullResult.getCode()) {
                    queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
                    queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
                    session.markPersistOffsetFlag(true);
                    pullMessage(session, subscription, queue);
                } else {
                    logger.error("response:{},{}", session.getClientId(), JSONObject.toJSONString(pullResult));
                }
                clearPullStatus(session, queue, pullEvent);
            } finally {
                clearPullStatus(session, queue, pullEvent);
            }
        });
        if (PullResultStatus.LATER.equals(this.queueCache.pullMessage(session, subscription, queue, queueOffset, pullSize, completableFuture))) {
            clearPullStatus(session, queue, pullEvent);
            this.scheduler.schedule(() -> {
                pullMessage(session, subscription, queue);
            }, this.pullIntervalMillis, TimeUnit.MILLISECONDS);
        }
    }

    private void clearPullStatus(Session session, Queue queue, PullEvent pullEvent) {
        this.pullEventMap.remove(eventQueueKey(session, queue), pullEvent);
        this.pullStatus.remove(eventQueueKey(session, queue));
    }

    private void persistAllOffset(boolean z) {
        try {
            for (Session session : this.sessionMap.values()) {
                if (!session.isClean()) {
                    if (persistOffset(session) && z) {
                        Thread.sleep(5L);
                    }
                }
            }
        } catch (Exception e) {
            logger.error("", e);
        }
    }

    private boolean persistOffset(Session session) {
        try {
            if (session.isClean()) {
                return true;
            }
            if (!session.getPersistOffsetFlag()) {
                return false;
            }
            this.lmqOffsetStore.save(session.getClientId(), session.offsetMapSnapshot());
            return true;
        } catch (Exception e) {
            logger.error("{}", session.getClientId(), e);
            return true;
        }
    }
}
