package org.tinygroup.cepcore.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.tinygroup.cepcore.CEPCore;
import org.tinygroup.cepcore.CEPCoreNodeManager;
import org.tinygroup.cepcore.exception.CEPConnectException;
import org.tinygroup.cepcore.util.CEPCoreUtil;
import org.tinygroup.cepcore.util.NodeServiceContainer2;
import org.tinygroup.cepcore.util.SCStatus;
import org.tinygroup.context.impl.ContextImpl;
import org.tinygroup.event.Event;
import org.tinygroup.event.ServiceRequest;
import org.tinygroup.event.central.Node;
import org.tinygroup.logger.LogLevel;
import org.tinygroup.logger.Logger;
import org.tinygroup.logger.LoggerFactory;
import org.tinygroup.parser.filter.NameFilter;
import org.tinygroup.xmlparser.node.XmlNode;

/* loaded from: input_file:org/tinygroup/cepcore/impl/CEPCoreNodeImpl.class */
public class CEPCoreNodeImpl extends AbstractCEPCoreOp {
    private static Logger logger = LoggerFactory.getLogger(CEPCoreNodeImpl.class);
    private static final int DEFAULT_INTERVAL = 20;
    private NodeServiceContainer2 serviceContainer = new NodeServiceContainer2();
    private Map<String, Node> centralNodesMap = new HashMap();
    private Map<Node, SCStatus> centralNodesStatusMap = new HashMap();
    private Map<String, List<String>> cepCentralRelationMap = new HashMap();
    private BreathThread breathThread = null;
    private CheckDownThread checkThread = null;
    private int breath_interval = DEFAULT_INTERVAL;
    private int sc_check_interval = DEFAULT_INTERVAL;
    private String nodeStrategy = CEPCoreNodeManager.DEFAULT_NODE_STRATEGY_BEAN;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/tinygroup/cepcore/impl/CEPCoreNodeImpl$BreathThread.class */
    public class BreathThread extends Thread {
        private static final int MILLISECOND_PER_SECOND = 1000;
        private CEPCore cep;
        private boolean stop = false;
        private int breathInterval;

        public BreathThread(CEPCore cEPCore, int i) {
            this.cep = cEPCore;
            this.breathInterval = i;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.stop) {
                try {
                    sleep(this.breathInterval * MILLISECOND_PER_SECOND);
                    CEPCoreNodeImpl.this.breath(this.cep);
                } catch (InterruptedException e) {
                    CEPCoreNodeImpl.logger.logMessage(LogLevel.INFO, "对SC心跳停止");
                    this.stop = true;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/tinygroup/cepcore/impl/CEPCoreNodeImpl$CheckDownThread.class */
    public class CheckDownThread extends Thread {
        private static final int MILLISECOND_PER_SECOND = 1000;
        private CEPCore cep;
        private boolean stop = false;
        private int checkInterval;

        public CheckDownThread(CEPCore cEPCore, int i) {
            this.cep = cEPCore;
            this.checkInterval = i;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.stop) {
                try {
                    sleep(this.checkInterval * MILLISECOND_PER_SECOND);
                    CEPCoreNodeImpl.this.checkDown(this.cep);
                } catch (InterruptedException e) {
                    CEPCoreNodeImpl.logger.logMessage(LogLevel.INFO, "对已关闭SC定时轮询停止");
                    this.stop = true;
                }
            }
        }
    }

    @Override // org.tinygroup.cepcore.CEPCoreNodeManager
    public Event remoteProcess(Event event) {
        ServiceRequest serviceRequest = event.getServiceRequest();
        event.getServiceRequest().setContext(CEPCoreUtil.getContext(event, this.serviceContainer.getServiceInfo(serviceRequest.getServiceId(), serviceRequest.getFullServiceName())));
        return remoteProcessWithPreparedEvent(event);
    }

    private Event remoteProcessWithPreparedEvent(Event event) {
        ServiceRequest serviceRequest = event.getServiceRequest();
        return remoteprocess(event, this.serviceContainer.getNode(serviceRequest.getServiceId(), serviceRequest.getFullServiceName(), event));
    }

    @Override // org.tinygroup.cepcore.impl.AbstractCEPCoreOp, org.tinygroup.cepcore.CEPCoreNodeManager
    public void setConfig(XmlNode xmlNode) {
        super.setConfig(xmlNode);
        NameFilter nameFilter = new NameFilter(xmlNode);
        XmlNode findNode = nameFilter.findNode("node-sc-breath");
        if (findNode != null) {
            String attribute = findNode.getAttribute("interval");
            if (!CEPCoreUtil.isNull(attribute)) {
                this.breath_interval = Integer.parseInt(attribute);
            }
        }
        XmlNode findNode2 = nameFilter.findNode("node-sc-check");
        if (findNode2 != null) {
            String attribute2 = findNode2.getAttribute("interval");
            if (!CEPCoreUtil.isNull(attribute2)) {
                this.sc_check_interval = Integer.parseInt(attribute2);
            }
        }
        XmlNode findNode3 = nameFilter.findNode("node-strategy");
        if (findNode3 != null) {
            this.nodeStrategy = findNode3.getAttribute("bean");
        }
        if (CEPCoreUtil.isNull(this.nodeStrategy)) {
            this.nodeStrategy = CEPCoreNodeManager.DEFAULT_NODE_STRATEGY_BEAN;
        }
        this.serviceContainer.setNodeStrategyBean(this.nodeStrategy);
    }

    @Override // org.tinygroup.cepcore.CEPCoreNodeManager
    public void unregisteNode(Node node) {
        logger.logMessage(LogLevel.INFO, "开始注销节点{0}", new Object[]{node.toString()});
        if ("centralnode".equals(node.getType())) {
            unregisteCentralNode(node);
        } else {
            unregisteCepNode(node);
        }
        removeConnect(node);
        logger.logMessage(LogLevel.INFO, "注销节点{0}完成", new Object[]{node.toString()});
    }

    @Override // org.tinygroup.cepcore.CEPCoreNodeManager
    public void registeNode(Node node, Node node2) {
        logger.logMessage(LogLevel.INFO, "开始注册节点{0}", new Object[]{node.toString()});
        removeConnect(node);
        addCepCentralRelation(node, node2);
        if ("centralnode".equals(node.getType())) {
            registeCentralNode(node);
        } else {
            registeCepNode(node);
        }
        logger.logMessage(LogLevel.INFO, "注册节点{0}完成", new Object[]{node.toString()});
    }

    private void removeNodeService(Node node) {
        synchronized (this.serviceContainer) {
            this.serviceContainer.removeCepNode(node);
        }
    }

    private void unregisteCepNode(Node node) {
        logger.logMessage(LogLevel.INFO, "开始注销CEP节点");
        removeNodeService(node);
        logger.logMessage(LogLevel.INFO, "移除Node:{0}与SC的关联", new Object[]{node.toString()});
        synchronized (this.cepCentralRelationMap) {
            this.cepCentralRelationMap.remove(CEPCoreUtil.getNodeKey(node));
        }
        logger.logMessage(LogLevel.INFO, "移除Node:{0}与SC的关联完成", new Object[]{node.toString()});
        logger.logMessage(LogLevel.INFO, "注销CEP节点完成");
    }

    @Override // org.tinygroup.cepcore.CEPCoreNodeManager
    public void registeNode(List<Node> list, Node node) {
        logger.logMessage(LogLevel.INFO, "开始注册远程节点列表");
        Iterator<Node> it = list.iterator();
        while (it.hasNext()) {
            registeNode(it.next(), node);
        }
        logger.logMessage(LogLevel.INFO, "注册远程节点列表完成");
    }

    @Override // org.tinygroup.cepcore.CEPCoreNodeManager
    public void unregisteNode(List<Node> list) {
        logger.logMessage(LogLevel.INFO, "开始注销远程节点列表");
        Iterator<Node> it = list.iterator();
        while (it.hasNext()) {
            unregisteNode(it.next());
        }
        logger.logMessage(LogLevel.INFO, "注销远程节点列表完成");
    }

    private void registeCentralNode(Node node) {
        addCentralNode(node);
    }

    private void unregisteCentralNode(Node node) {
        logger.logMessage(LogLevel.INFO, "开始注销服务中心节点");
        String nodeKey = CEPCoreUtil.getNodeKey(node);
        if (this.centralNodesMap.containsKey(nodeKey)) {
            Node node2 = this.centralNodesMap.get(nodeKey);
            synchronized (this.centralNodesStatusMap) {
                this.centralNodesStatusMap.put(node2, SCStatus.DOWN);
            }
        } else {
            logger.logMessage(LogLevel.INFO, "服务中心节点不存在");
        }
        logger.logMessage(LogLevel.INFO, "注销服务中心节点完成");
    }

    private void registeCepNode(Node node) {
        synchronized (this.serviceContainer) {
            this.serviceContainer.removeCepNode(node);
            this.serviceContainer.addCepNode(node);
        }
    }

    private Map<String, Node> getAllCentralNodes() {
        return this.centralNodesMap;
    }

    private Map<String, Node> getAliveCentralNodes() {
        HashMap hashMap = new HashMap();
        for (String str : this.centralNodesMap.keySet()) {
            Node node = this.centralNodesMap.get(str);
            synchronized (this.centralNodesStatusMap) {
                if (this.centralNodesStatusMap.get(node) == SCStatus.ALIVE) {
                    hashMap.put(str, node);
                }
            }
        }
        return hashMap;
    }

    private Map<String, Node> getNotDownCentralNodes() {
        HashMap hashMap = new HashMap();
        for (String str : this.centralNodesMap.keySet()) {
            Node node = this.centralNodesMap.get(str);
            synchronized (this.centralNodesStatusMap) {
                if (!this.centralNodesStatusMap.containsKey(node) || this.centralNodesStatusMap.get(node) != SCStatus.DOWN) {
                    hashMap.put(str, node);
                }
            }
        }
        return hashMap;
    }

    private Map<String, Node> getDownCentralNodes() {
        HashMap hashMap = new HashMap();
        for (String str : this.centralNodesMap.keySet()) {
            Node node = this.centralNodesMap.get(str);
            synchronized (this.centralNodesStatusMap) {
                if (this.centralNodesStatusMap.get(node) == SCStatus.DOWN) {
                    hashMap.put(str, node);
                }
            }
        }
        return hashMap;
    }

    @Override // org.tinygroup.cepcore.CEPCoreNodeManager
    public void addCentralNodes(List<Node> list) {
        Iterator<Node> it = list.iterator();
        while (it.hasNext()) {
            addCentralNode(it.next());
        }
    }

    @Override // org.tinygroup.cepcore.CEPCoreNodeManager
    public void addCentralNode(Node node) {
        String nodeKey = CEPCoreUtil.getNodeKey(node);
        if (this.centralNodesMap.containsKey(nodeKey)) {
            logger.logMessage(LogLevel.INFO, "服务中心节点已存在,无需添加");
        } else {
            this.centralNodesMap.put(nodeKey, node);
            logger.logMessage(LogLevel.INFO, "服务中心节点不存在,加入服务中心节点列表");
        }
    }

    @Override // org.tinygroup.cepcore.impl.AbstractCEPCoreOp, org.tinygroup.cepcore.CEPCoreNodeOp
    public void startCEPCore(CEPCore cEPCore) {
        super.startCEPCore(cEPCore);
        Node node = getNode();
        node.getServiceInfos().addAll(cEPCore.getServiceInfos());
        logger.logMessage(LogLevel.INFO, "开始注册当前节点至服务中心");
        registeProcess(node);
        logger.logMessage(LogLevel.INFO, "注册当前节点至服务中心完成");
        startBreathThread(cEPCore);
        startCheckThread(cEPCore);
    }

    public void startBreathThread(CEPCore cEPCore) {
        logger.logMessage(LogLevel.INFO, "开始启动SC定时轮询,间隔{interval}秒", new Object[]{Integer.valueOf(this.breath_interval)});
        this.breathThread = new BreathThread(cEPCore, this.breath_interval);
        this.breathThread.start();
        logger.logMessage(LogLevel.INFO, "启动SC定时轮询完毕,间隔{interval}秒", new Object[]{Integer.valueOf(this.breath_interval)});
    }

    public void startCheckThread(CEPCore cEPCore) {
        logger.logMessage(LogLevel.INFO, "开始启动已关闭SC定时轮询,间隔{interval}秒", new Object[]{Integer.valueOf(this.sc_check_interval)});
        this.checkThread = new CheckDownThread(cEPCore, this.sc_check_interval);
        this.checkThread.start();
        logger.logMessage(LogLevel.INFO, "启动已关闭SC定时轮询完毕,间隔{interval}秒", new Object[]{Integer.valueOf(this.sc_check_interval)});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void breath(CEPCore cEPCore) {
        Node node = getNode();
        node.getServiceInfos().addAll(cEPCore.getServiceInfos());
        logger.logMessage(LogLevel.INFO, "开始向SC发起心跳检测");
        for (Node node2 : getNotDownCentralNodes().values()) {
            logger.logMessage(LogLevel.INFO, "尝试连接SC[{0}]", new Object[]{node2.toString()});
            ContextImpl contextImpl = new ContextImpl();
            contextImpl.put("node", node);
            try {
                Event remoteprocess = remoteprocess(getEvent(CEPCoreNodeManager.BREATH_CHECK_SERVICE, contextImpl), node2);
                logger.logMessage(LogLevel.INFO, "连接SC[{0}]成功", new Object[]{node2.toString()});
                Boolean bool = (Boolean) remoteprocess.getServiceRequest().getContext().get("success");
                logger.logMessage(LogLevel.INFO, "心跳处理结果:{checkResult}", new Object[]{bool});
                boolean z = bool != null ? !bool.booleanValue() : true;
                synchronized (this.centralNodesStatusMap) {
                    if (!this.centralNodesStatusMap.containsKey(node2)) {
                        this.centralNodesStatusMap.put(node2, SCStatus.ALIVE);
                    } else if (this.centralNodesStatusMap.get(node2) == SCStatus.OUT) {
                        z = true;
                    }
                }
                if (z) {
                    logger.logMessage(LogLevel.INFO, "向SC[{0}]重新注册", new Object[]{node2.toString()});
                    registeProcess(node, node2);
                    synchronized (this.centralNodesStatusMap) {
                        this.centralNodesStatusMap.put(node2, SCStatus.ALIVE);
                    }
                    logger.logMessage(LogLevel.INFO, "向SC[{0}]重新注册完毕", new Object[]{node2.toString()});
                }
            } catch (Exception e) {
                logger.logMessage(LogLevel.ERROR, "连接SC[{0}]时发生异常:{1}", new Object[]{node2.toString(), e.getMessage()});
            }
        }
        logger.logMessage(LogLevel.INFO, "向SC发起心跳检测完成");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkDown(CEPCore cEPCore) {
        logger.logMessage(LogLevel.INFO, "开始尝试连接已关闭的SC");
        Node node = getNode();
        node.getServiceInfos().addAll(cEPCore.getServiceInfos());
        for (Node node2 : getDownCentralNodes().values()) {
            logger.logMessage(LogLevel.INFO, "尝试连接已关闭的SC[{0}]", new Object[]{node2.toString()});
            ContextImpl contextImpl = new ContextImpl();
            contextImpl.put("node", node);
            try {
                remoteprocess(getEvent(CEPCoreNodeManager.BREATH_CHECK_SERVICE, contextImpl), node2);
                logger.logMessage(LogLevel.INFO, "连接SC[{0}]成功", new Object[]{node2.toString()});
                synchronized (this.centralNodesStatusMap) {
                    if (!this.centralNodesStatusMap.containsKey(node2)) {
                        this.centralNodesStatusMap.put(node2, SCStatus.ALIVE);
                    } else if (this.centralNodesStatusMap.get(node2) == SCStatus.DOWN) {
                        logger.logMessage(LogLevel.INFO, "向SC[{0}]重新注册", new Object[]{node2.toString()});
                        registeProcess(node, node2);
                        this.centralNodesStatusMap.put(node2, SCStatus.ALIVE);
                        logger.logMessage(LogLevel.INFO, "向SC[{0}]重新注册完毕", new Object[]{node2.toString()});
                    }
                }
            } catch (Exception e) {
                logger.logMessage(LogLevel.ERROR, "连接已关闭的SC[{0}]时发生异常:{1}", new Object[]{node2.toString(), e.getMessage()});
            }
        }
        logger.logMessage(LogLevel.INFO, "尝试连接已关闭的SC完成");
    }

    @Override // org.tinygroup.cepcore.impl.AbstractCEPCoreOp, org.tinygroup.cepcore.CEPCoreNodeOp
    public void stopCEPCore(CEPCore cEPCore) {
        super.stopCEPCore(cEPCore);
        Node node = getNode();
        node.getServiceInfos().addAll(cEPCore.getServiceInfos());
        stopBreathThread();
        stopCheckThread();
        unregisteProcess(node);
    }

    public void stopBreathThread() {
        logger.logMessage(LogLevel.INFO, "开始停止SC定时轮询");
        if (this.breathThread != null && this.breathThread.isAlive()) {
            this.breathThread.stop = true;
            BreathThread breathThread = this.breathThread;
            this.breathThread = null;
            breathThread.interrupt();
            logger.logMessage(LogLevel.INFO, "设置停止标识符为true");
        }
        logger.logMessage(LogLevel.INFO, "停止SC定时轮询完成");
    }

    public void stopCheckThread() {
        logger.logMessage(LogLevel.INFO, "开始停止checkThread SC定时轮询");
        if (this.checkThread != null && this.checkThread.isAlive()) {
            this.checkThread.stop = true;
            CheckDownThread checkDownThread = this.checkThread;
            this.checkThread = null;
            checkDownThread.interrupt();
        }
        logger.logMessage(LogLevel.INFO, "停止checkThread SC定时轮询完成");
    }

    private void registeProcess(Node node) {
        logger.logMessage(LogLevel.INFO, "向服务中心注册节点{0}", new Object[]{node.toString()});
        Iterator<Node> it = getAllCentralNodes().values().iterator();
        while (it.hasNext()) {
            registeProcess(node, it.next());
        }
        logger.logMessage(LogLevel.INFO, "向服务中心注册节点{0}完成", new Object[]{node.toString()});
    }

    private void registeProcess(Node node, Node node2) {
        ContextImpl contextImpl = new ContextImpl();
        contextImpl.put("node", node);
        logger.logMessage(LogLevel.INFO, "开始向{0}注册{1}", new Object[]{node2.toString(), node.toString()});
        addCepCentralRelation(node, node2);
        synchronized (this.centralNodesStatusMap) {
            try {
                remoteprocess(getEvent(CEPCoreNodeManager.REG_NODE_SERVICE, contextImpl), node2);
                this.centralNodesStatusMap.put(node2, SCStatus.ALIVE);
            } catch (Exception e) {
                logger.logMessage(LogLevel.ERROR, "向{0}注册{1}时失败:{2}", new Object[]{node2.toString(), node.toString(), e.getMessage()});
                this.centralNodesStatusMap.put(node2, SCStatus.OUT);
            }
        }
        logger.logMessage(LogLevel.INFO, "向{0}注册{1}完成", new Object[]{node2.toString(), node.toString()});
    }

    private void unregisteProcess(Node node) {
        List<String> list;
        String nodeKey = CEPCoreUtil.getNodeKey(node);
        synchronized (this.cepCentralRelationMap) {
            list = this.cepCentralRelationMap.get(nodeKey);
        }
        if (list == null) {
            logger.logMessage(LogLevel.INFO, "node:{0}无对应的SC，该节点或已被注销，退出注销", new Object[]{node.toString()});
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (Node node2 : getAliveCentralNodes().values()) {
            if (list.contains(CEPCoreUtil.getNodeKey(node2))) {
                arrayList.add(node2);
            }
        }
        logger.logMessage(LogLevel.INFO, "node:{0}对应的SC列表长度:{2}", new Object[]{node.toString(), Integer.valueOf(arrayList.size())});
        doUnregisteProcess(node, arrayList);
    }

    private void doUnregisteProcess(Node node, List<Node> list) {
        for (Node node2 : list) {
            ContextImpl contextImpl = new ContextImpl();
            contextImpl.put("node", node);
            logger.logMessage(LogLevel.INFO, "开始向{0}注销{1}", new Object[]{node2.toString(), node.toString()});
            try {
                remoteprocess(getEvent(CEPCoreNodeManager.UNREG_NODE_SERVICE, contextImpl), node2);
            } catch (Exception e) {
                logger.logMessage(LogLevel.ERROR, "向{0}注销{1}时失败:{1}", new Object[]{node2.toString(), node.toString(), e.getMessage()});
            }
            logger.logMessage(LogLevel.INFO, "向{0}注销{1}完成", new Object[]{node2.toString(), node.toString()});
        }
    }

    @Override // org.tinygroup.cepcore.impl.AbstractCEPCoreOp
    protected String getType() {
        return "cepnode";
    }

    /* JADX WARN: Type inference failed for: r11v0, types: [java.lang.Throwable, org.tinygroup.cepcore.exception.CEPConnectException] */
    @Override // org.tinygroup.cepcore.impl.AbstractCEPCoreOp, org.tinygroup.cepcore.CEPCoreNodeOp
    public Event remoteprocess(Event event, Node node) {
        try {
            return super.remoteprocess(event, node);
        } catch (CEPConnectException e) {
            if (node.getType().equals("cepnode")) {
                removeNodeService(node);
                unregisteProcess(node);
                return remoteProcessWithPreparedEvent(event);
            }
            if (node.getType().equals("centralnode")) {
                logger.logMessage(LogLevel.INFO, "连接SC节点{0}时出错:{1}", new Object[]{node.toString(), e.getMessage()});
                synchronized (this.centralNodesStatusMap) {
                    if (!this.centralNodesStatusMap.containsKey(node) || this.centralNodesStatusMap.get(node) == SCStatus.ALIVE) {
                        this.centralNodesStatusMap.put(node, SCStatus.OUT);
                        removeConnect(node);
                    }
                }
            }
            throw e;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v27, types: [java.util.List] */
    private void addCepCentralRelation(Node node, Node node2) {
        ArrayList arrayList;
        logger.logMessage(LogLevel.INFO, "添加Node:{0}与SC:{1}的关联", new Object[]{node.toString(), node2.toString()});
        String nodeKey = CEPCoreUtil.getNodeKey(node);
        String nodeKey2 = CEPCoreUtil.getNodeKey(node2);
        synchronized (this.cepCentralRelationMap) {
            if (this.cepCentralRelationMap.containsKey(nodeKey)) {
                arrayList = (List) this.cepCentralRelationMap.get(nodeKey);
            } else {
                arrayList = new ArrayList();
                this.cepCentralRelationMap.put(nodeKey, arrayList);
            }
            if (!arrayList.contains(nodeKey2)) {
                arrayList.add(nodeKey2);
                logger.logMessage(LogLevel.INFO, "添加Node:{0}与SC:{1}的关联", new Object[]{node.toString(), node2.toString()});
            }
        }
        logger.logMessage(LogLevel.INFO, "添加Node与SC的关联完成", new Object[]{node.toString(), node2.toString()});
    }

    @Override // org.tinygroup.cepcore.CEPCoreNodeManager
    public boolean check(Node node) {
        return true;
    }
}
