package org.apache.rocketmq.mqtt.cs.session.loop;

import com.alibaba.fastjson.JSON;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import java.text.SimpleDateFormat;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.mqtt.common.facade.WillMsgPersistManager;
import org.apache.rocketmq.mqtt.common.meta.IpUtil;
import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
import org.apache.rocketmq.mqtt.common.model.WillMessage;
import org.apache.rocketmq.mqtt.common.util.MessageUtil;
import org.apache.rocketmq.mqtt.cs.session.infly.MqttMsgId;
import org.apache.rocketmq.mqtt.ds.upstream.processor.PublishProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.class */
public class WillLoop {
    private static Logger logger = LoggerFactory.getLogger(WillLoop.class);
    private ScheduledThreadPoolExecutor aliveService = new ScheduledThreadPoolExecutor(2, (ThreadFactory) new ThreadFactoryImpl("check_alive_thread_"));
    private long checkAliveIntervalMillis = 5000;
    private ThreadPoolExecutor executor;

    @Resource
    private WillMsgPersistManager willMsgPersistManager;

    @Resource
    private MqttMsgId mqttMsgId;

    @Resource
    private PublishProcessor publishProcessor;

    @PostConstruct
    public void init() {
        this.aliveService.scheduleWithFixedDelay(() -> {
            csLoop();
        }, 15000L, 10000L, TimeUnit.MILLISECONDS);
        this.aliveService.scheduleWithFixedDelay(() -> {
            masterLoop();
        }, 10000L, 10000L, TimeUnit.MILLISECONDS);
        this.executor = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, (BlockingQueue<Runnable>) new LinkedBlockingQueue(5000), (ThreadFactory) new ThreadFactoryImpl("DispatchWillToMQ_ "));
    }

    private void csLoop() {
        try {
            String localAddressCompatible = IpUtil.getLocalAddressCompatible();
            String str = "alive1" + localAddressCompatible;
            String str2 = "master";
            long currentTimeMillis = System.currentTimeMillis();
            this.willMsgPersistManager.put(str, String.valueOf(currentTimeMillis)).whenComplete((bool, th) -> {
                if (bool == null || th != null) {
                    logger.error("{} fail to put csKey", str, th);
                }
            });
            this.willMsgPersistManager.get("master").whenComplete((bArr, th2) -> {
                String str3 = new String(bArr);
                if ("NOT_FOUND".equals(str3) || masterHasDown(str3)) {
                    this.willMsgPersistManager.compareAndPut(str2, str3, localAddressCompatible + ":" + currentTimeMillis).whenComplete((bool2, th2) -> {
                        if (bool2.booleanValue() && th2 == null) {
                            logger.info("{} update master successfully", localAddressCompatible);
                        } else {
                            logger.error("{} fail to update master", localAddressCompatible, th2);
                        }
                    });
                }
            });
        } catch (Exception e) {
            logger.error("", e);
        }
    }

    private boolean masterHasDown(String str) {
        String[] split = str.split(":");
        if (split.length >= 2) {
            return System.currentTimeMillis() - Long.parseLong(split[1]) > 10 * this.checkAliveIntervalMillis;
        }
        logger.error("master ip:updateTime split error, len < 2");
        return true;
    }

    private void masterLoop() {
        try {
            String localAddressCompatible = IpUtil.getLocalAddressCompatible();
            if (localAddressCompatible == null) {
                logger.error("can not get local ip");
            } else {
                this.willMsgPersistManager.get("master").whenComplete((bArr, th) -> {
                    if (bArr == null || th != null) {
                        logger.error("fail to get CS_MASTER", th);
                        return;
                    }
                    String str = new String(bArr);
                    if (!"NOT_FOUND".equals(str) && str.startsWith(localAddressCompatible)) {
                        this.willMsgPersistManager.compareAndPut("master", str, localAddressCompatible + ":" + System.currentTimeMillis()).whenComplete((bool, th) -> {
                            if (bool.booleanValue() && th == null) {
                                return;
                            }
                            logger.error("{} fail to update master", localAddressCompatible, th);
                        });
                        this.willMsgPersistManager.scan("alive0", "alive2").whenComplete((map, th2) -> {
                            if (map == null || th2 != null) {
                                logger.error("{} master fail to scan cs", localAddressCompatible, th2);
                                return;
                            }
                            if (map.size() == 0) {
                                logger.info("master scanned 0 cs");
                                return;
                            }
                            for (Map.Entry entry : map.entrySet()) {
                                String str2 = (String) entry.getKey();
                                String str3 = (String) entry.getValue();
                                logger.info("master:{} scan cs:{}, heart:{} {}", new Object[]{localAddressCompatible, str2, str3, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Long.valueOf(Long.parseLong(str3)))});
                                if (System.currentTimeMillis() - Long.parseLong(str3) > 10 * this.checkAliveIntervalMillis) {
                                    handleShutDownCS(str2.substring("alive1".length()));
                                    this.willMsgPersistManager.delete(str2).whenComplete((bool2, th2) -> {
                                        if (bool2.booleanValue() && th2 == null) {
                                            logger.debug("delete shutDown cs:{} in db successfully", str2);
                                        } else {
                                            logger.error("fail to delete shutDown cs:{} in db", str2);
                                        }
                                    });
                                }
                            }
                        });
                    }
                });
            }
        } catch (Exception e) {
            logger.error("", e);
        }
    }

    private void handleShutDownCS(String str) {
        this.willMsgPersistManager.scan(str + 0, str + 2).whenComplete((map, th) -> {
            if (map == null || th != null) {
                logger.error("{} master fail to scan cs", str, th);
                return;
            }
            if (map.size() == 0) {
                logger.info("the cs:{} has no will", str);
                return;
            }
            for (Map.Entry entry : map.entrySet()) {
                logger.info("master handle will {} {}", entry.getKey(), entry.getValue());
                final String str2 = (String) entry.getKey();
                String substring = ((String) entry.getKey()).substring((str + 1).length());
                WillMessage willMessage = (WillMessage) JSON.parseObject((String) entry.getValue(), WillMessage.class);
                final MqttPublishMessage mqttMessage = MessageUtil.toMqttMessage(willMessage.getWillTopic(), willMessage.getBody(), willMessage.getQos(), this.mqttMsgId.nextId(substring), willMessage.isRetain());
                this.executor.submit(new Runnable() { // from class: org.apache.rocketmq.mqtt.cs.session.loop.WillLoop.1
                    @Override // java.lang.Runnable
                    public void run() {
                        CompletableFuture process = WillLoop.this.publishProcessor.process(new MqttMessageUpContext(), mqttMessage);
                        String str3 = str2;
                        process.whenComplete((hookResult, th) -> {
                            try {
                                if (hookResult.isSuccess()) {
                                    WillLoop.this.willMsgPersistManager.delete(str3).whenComplete((bool, th) -> {
                                        if (bool.booleanValue() && th == null) {
                                            WillLoop.logger.info("delete will message key {} successfully", str3);
                                        } else {
                                            WillLoop.logger.error("fail to delete will message key:{}", str3);
                                        }
                                    });
                                } else {
                                    WillLoop.this.executor.submit(this);
                                }
                            } catch (Throwable th2) {
                                WillLoop.logger.error("", th2);
                            }
                        });
                    }
                });
            }
        });
    }
}
