package org.tinygroup.cepcorenettysc;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.tinygroup.beancontainer.BeanContainerFactory;
import org.tinygroup.cepcore.CEPCore;
import org.tinygroup.cepcorenettysc.operator.ScUnregTrigger;
import org.tinygroup.cepcorenettysc.remote.EventClientDaemonRunnable;
import org.tinygroup.cepcorenettysc.remote.NettyEventProcessor;
import org.tinygroup.cepcorenettysc.remote.NettyEventProcessorConatiner;
import org.tinygroup.context.Context;
import org.tinygroup.context.impl.ContextImpl;
import org.tinygroup.event.Event;
import org.tinygroup.event.ServiceInfo;
import org.tinygroup.event.central.Node;
import org.tinygroup.logger.LogLevel;
import org.tinygroup.logger.Logger;
import org.tinygroup.logger.LoggerFactory;
import org.tinygroup.net.ServerHandler;
import org.tinygroup.net.daemon.DaemonUtils;
import org.tinygroup.net.exception.InterruptedRuntimeException;

/* loaded from: input_file:WEB-INF/lib/org.tinygroup.cepcorenettysc-1.2.2.jar:org/tinygroup/cepcorenettysc/EventServerHandler.class */
public class EventServerHandler extends ServerHandler {
    Map<String, Event> eventMap = new ConcurrentHashMap();
    Map<String, ChannelHandlerContext> channelHandlerContextMap = new ConcurrentHashMap();
    ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
    static final Logger logger = LoggerFactory.getLogger((Class<?>) EventServerHandler.class);
    static Map<Node, List<ServiceInfo>> nodeServices = new HashMap();
    static Map<String, Node> nodes = new HashMap();
    static Map<String, EventClientDaemonRunnable> clients = new HashMap();

    public void stop() {
        Iterator<EventClientDaemonRunnable> it = clients.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
    }

    private EventClientDaemonRunnable getClient(String str) {
        if (clients.containsKey(str)) {
            return clients.get(str);
        }
        Node node = nodes.get(str);
        EventClientDaemonRunnable eventClientDaemonRunnable = new EventClientDaemonRunnable(node.getIp(), Integer.parseInt(node.getPort()), false);
        eventClientDaemonRunnable.addPostEventTrigger(new ScUnregTrigger());
        DaemonUtils.daemon(node.toString(), eventClientDaemonRunnable);
        clients.put(str, eventClientDaemonRunnable);
        return eventClientDaemonRunnable;
    }

    @Override // org.tinygroup.net.ServerHandler
    protected void processObject(Object obj, ChannelHandlerContext channelHandlerContext) {
        Event event = (Event) obj;
        String eventId = event.getEventId();
        this.eventMap.put(eventId, event);
        this.channelHandlerContextMap.put(eventId, channelHandlerContext);
        String serviceId = HandlerUtil.getServiceId(event);
        logger.logMessage(LogLevel.INFO, "接收到请求服务id:{}", serviceId);
        if (NettyCepCoreUtil.AR_TO_SC.equals(serviceId)) {
            logger.logMessage(LogLevel.INFO, "请求{}由普通节点发向服务中心", serviceId);
            Context context = event.getServiceRequest().getContext();
            logger.logMessage(LogLevel.INFO, "发起节点:{}", ((Node) context.get(NettyCepCoreUtil.NODE_KEY)).toString());
            String str = (String) context.get(NettyCepCoreUtil.TYPE_KEY);
            if (NettyCepCoreUtil.REG_KEY.equals(str)) {
                logger.logMessage(LogLevel.INFO, "请求为注册请求");
                arRegToSc(context);
            } else if (NettyCepCoreUtil.UNREG_KEY.equals(str)) {
                logger.logMessage(LogLevel.INFO, "请求为注销请求");
                arUnregToSc(context);
            }
            clearRequest(event.getEventId());
            event.setType(2);
            channelHandlerContext.getChannel().write(event);
            return;
        }
        if (!NettyCepCoreUtil.SC_TO_AR.equals(serviceId)) {
            run(event);
            return;
        }
        logger.logMessage(LogLevel.INFO, "请求{}由服务中心发向普通节点", serviceId);
        Context context2 = event.getServiceRequest().getContext();
        String str2 = (String) context2.get(NettyCepCoreUtil.TYPE_KEY);
        if (NettyCepCoreUtil.REG_KEY.equals(str2)) {
            logger.logMessage(LogLevel.INFO, "请求为注册请求");
            scRegToAr(context2);
        } else if (NettyCepCoreUtil.UNREG_KEY.equals(str2)) {
            logger.logMessage(LogLevel.INFO, "请求为注销请求");
            scUnregToAr(context2);
        }
        clearRequest(event.getEventId());
        event.setType(2);
        channelHandlerContext.getChannel().write(event);
    }

    private void scRegToAr(Context context) {
        Node node = (Node) context.get(NettyCepCoreUtil.NODE_KEY);
        List list = (List) context.get(NettyCepCoreUtil.SC_TO_AR_SERVICE_KEY);
        logger.logMessage(LogLevel.INFO, "开始注册节点:{},为节点创建服务处理器", node.toString());
        NettyEventProcessorConatiner.add(node.toString(), new NettyEventProcessor(node, list), (CEPCore) BeanContainerFactory.getBeanContainer(getClass().getClassLoader()).getBean(CEPCore.CEP_CORE_BEAN));
        logger.logMessage(LogLevel.INFO, "为节点:{}创建服务处理器完成", node.toString());
    }

    private void scUnregToAr(Context context) {
        Node node = (Node) context.get(NettyCepCoreUtil.NODE_KEY);
        logger.logMessage(LogLevel.INFO, "开始注销节点:{}", node.toString());
        NettyEventProcessorConatiner.remove(node.toString(), (CEPCore) BeanContainerFactory.getBeanContainer(getClass().getClassLoader()).getBean(CEPCore.CEP_CORE_BEAN));
        logger.logMessage(LogLevel.INFO, "注销节点:{}完成", node.toString());
    }

    private void arRegToSc(Context context) {
        logger.logMessage(LogLevel.INFO, "开始处理节点向服务中心发起的注册请求");
        List<ServiceInfo> list = (List) context.get(NettyCepCoreUtil.AR_TO_SC_SERVICE_KEY);
        Node node = (Node) context.get(NettyCepCoreUtil.NODE_KEY);
        String node2 = node.toString();
        if (nodes.containsKey(node2)) {
            nodeServices.remove(nodes.get(node2));
            nodes.remove(node2);
        }
        logger.logMessage(LogLevel.INFO, "开始将{0}节点注册至已有节点列表", node2);
        for (String str : nodes.keySet()) {
            logger.logMessage(LogLevel.INFO, "开始将{0}节点注册至已有节点:{1}", node2, str);
            EventClientDaemonRunnable client = getClient(str);
            ContextImpl contextImpl = new ContextImpl();
            contextImpl.put(NettyCepCoreUtil.NODE_KEY, node);
            contextImpl.put(NettyCepCoreUtil.TYPE_KEY, NettyCepCoreUtil.REG_KEY);
            contextImpl.put(NettyCepCoreUtil.SC_TO_AR_SERVICE_KEY, list);
            try {
                NettyCepCoreUtil.sendEvent(client.getClient(), Event.createEvent(NettyCepCoreUtil.SC_TO_AR, contextImpl));
            } catch (InterruptedRuntimeException e) {
                logger.errorMessage("向{0}注册{1}时网络异常", e, str, node2);
            }
            logger.logMessage(LogLevel.INFO, "将{0}节点注册至已有节点:{1}完成", node2, str);
        }
        logger.logMessage(LogLevel.INFO, "将{0}节点注册至已有节点列表完成", node2);
        context.put(NettyCepCoreUtil.SC_TO_AR_SERVICE_KEY, copy());
        nodes.put(node2, node);
        nodeServices.put(node, list);
        getClient(node2);
        logger.logMessage(LogLevel.INFO, "处理节点向服务中心发起的注册请求完成");
    }

    private void arUnregToSc(Context context) {
        logger.logMessage(LogLevel.INFO, "开始处理节点向服务中心发起的注销请求");
        String node = ((Node) context.get(NettyCepCoreUtil.NODE_KEY)).toString();
        Node remove = nodes.remove(node);
        nodeServices.remove(remove);
        clients.remove(node).stop();
        context.put(NettyCepCoreUtil.NODES_KEY, nodes);
        logger.logMessage(LogLevel.INFO, "开始将注销请求发送至已有节点列表");
        for (String str : clients.keySet()) {
            logger.logMessage(LogLevel.INFO, "向{0}注销{1}", str, node);
            EventClient client = getClient(str).getClient();
            ContextImpl contextImpl = new ContextImpl();
            contextImpl.put(NettyCepCoreUtil.NODE_KEY, remove);
            contextImpl.put(NettyCepCoreUtil.TYPE_KEY, NettyCepCoreUtil.UNREG_KEY);
            try {
                NettyCepCoreUtil.sendEvent(client, Event.createEvent(NettyCepCoreUtil.SC_TO_AR, contextImpl));
            } catch (InterruptedRuntimeException e) {
                logger.errorMessage("向{0}注销{1}时网络异常", e, str, node);
            }
            logger.logMessage(LogLevel.INFO, "向{0}注销{1}完成", str, node);
        }
        logger.logMessage(LogLevel.INFO, "将注销请求发送至已有节点列表完成");
        logger.logMessage(LogLevel.INFO, "处理节点向服务中心发起的注销请求完成");
    }

    private Map<Node, List<ServiceInfo>> copy() {
        HashMap hashMap = new HashMap();
        for (Node node : nodeServices.keySet()) {
            hashMap.put(node, nodeServices.get(node));
        }
        return hashMap;
    }

    public void clearRequest(String str) {
        this.eventMap.remove(str);
        this.channelHandlerContextMap.remove(str);
    }

    public void run(Event event) {
        ChannelHandlerContext channelHandlerContext = this.channelHandlerContextMap.get(event.getEventId());
        if (channelHandlerContext != null) {
            if (event.getMode() == 2) {
                event.setType(2);
                channelHandlerContext.getChannel().write(getAsynchronousResponseEvent(event));
            }
            ((CEPCore) BeanContainerFactory.getBeanContainer(getClass().getClassLoader()).getBean(CEPCore.CEP_CORE_BEAN)).process(event);
            ChannelHandlerContext channelHandlerContext2 = this.channelHandlerContextMap.get(event.getEventId());
            if (event.getMode() == 1 && channelHandlerContext2 != null) {
                event.setType(2);
                channelHandlerContext2.getChannel().write(event);
            }
        }
        clearRequest(event.getEventId());
    }

    @Override // org.tinygroup.net.ServerHandler, org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) {
        logger.errorMessage("发生错误", exceptionEvent.getCause());
        for (String str : this.channelHandlerContextMap.keySet()) {
            if (this.channelHandlerContextMap.get(str) == channelHandlerContext) {
                Event event = this.eventMap.get(str);
                event.setType(2);
                event.setThrowable(exceptionEvent.getCause());
                channelHandlerContext.getChannel().write(event);
                clearRequest(str);
            }
        }
    }

    private Event getAsynchronousResponseEvent(Event event) {
        Event event2 = new Event();
        event2.setEventId(event.getEventId());
        event2.setMode(2);
        event2.setType(2);
        return event2;
    }

    public static void remove(EventClientDaemonRunnable eventClientDaemonRunnable) {
        logger.logMessage(LogLevel.INFO, "开始移除连接");
        String str = null;
        Iterator<String> it = clients.keySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String next = it.next();
            if (eventClientDaemonRunnable == clients.get(next)) {
                str = next;
                break;
            }
        }
        logger.logMessage(LogLevel.INFO, "连接的节点字符串为:{}", str);
        if (str == null) {
            logger.logMessage(LogLevel.INFO, "无需移除");
            return;
        }
        Node remove = nodes.remove(str);
        clients.remove(str);
        nodeServices.remove(remove);
        logger.logMessage(LogLevel.INFO, "移除连接完成");
    }
}
