package stream.runtime;

import java.net.InetAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.xml.parsers.DocumentBuilderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import stream.Data;
import stream.Process;
import stream.ProcessContext;
import stream.container.IContainer;
import stream.data.DataFactory;
import stream.io.Queue;
import stream.io.Sink;
import stream.io.Source;
import stream.run;
import stream.runtime.rpc.RMINamingService;
import stream.runtime.setup.ObjectCreator;
import stream.runtime.setup.factory.ObjectFactory;
import stream.runtime.setup.factory.ProcessorFactory;
import stream.runtime.setup.handler.ContainerRefElementHandler;
import stream.runtime.setup.handler.DocumentHandler;
import stream.runtime.setup.handler.LibrariesElementHandler;
import stream.runtime.setup.handler.MonitorElementHandler;
import stream.runtime.setup.handler.ProcessElementHandler;
import stream.runtime.setup.handler.PropertiesHandler;
import stream.runtime.setup.handler.QueueElementHandler;
import stream.runtime.setup.handler.ServiceElementHandler;
import stream.runtime.setup.handler.SinkElementHandler;
import stream.runtime.setup.handler.StreamElementHandler;
import stream.runtime.setup.handler.SystemPropertiesHandler;
import stream.service.NamingService;
import stream.service.Service;
import stream.util.Variables;
import stream.util.XIncluder;
import stream.util.XMLUtils;
import stream.utils.PrintGraph;
import streams.application.ComputeGraph;
import streams.runtime.Hook;
import streams.runtime.Signals;
import streams.runtime.Supervisor;

/* loaded from: input_file:stream/runtime/ProcessContainer.class */
public class ProcessContainer implements IContainer, Runnable {
    static Logger log = LoggerFactory.getLogger(ProcessContainer.class);
    static final List<ProcessContainer> container = new ArrayList();
    protected final ObjectFactory objectFactory;
    protected final ProcessorFactory processorFactory;
    protected final ComputeGraph depGraph;
    protected final DependencyInjection dependencyInjection;
    protected String name;
    protected final ContainerContext context;

    /* renamed from: streams, reason: collision with root package name */
    protected final Map<String, Source> f0streams;
    protected final Map<String, Sink> sinks;
    protected final Map<String, Queue> listeners;
    protected final List<Process> processes;
    protected final Map<Process, ProcessContext> processContexts;
    protected final List<ProcessThread> worker;
    protected final List<ServiceReference> serviceRefs;
    protected final Map<String, ElementHandler> elementHandler;
    protected final List<DocumentHandler> documentHandler;
    protected NamingService namingService;
    protected final List<LifeCycle> lifeCyleObjects;
    boolean server;
    protected long runtime;
    protected Long startTime;
    protected Variables containerVariables;
    private Exception failFastReason;
    static final String[] extensions;
    static final Map<String, ElementHandler> autoHandlers;
    Supervisor supervisor;

    public static Document parseDocument(URL url) throws Exception {
        DocumentBuilderFactory newInstance = DocumentBuilderFactory.newInstance();
        newInstance.setNamespaceAware(true);
        return newInstance.newDocumentBuilder().parse(url.openStream());
    }

    public ProcessContainer(URL url) throws Exception {
        this(url, null);
    }

    public ProcessContainer(URL url, Map<String, ElementHandler> map) throws Exception {
        this(parseDocument(url), map, (Map<String, String>) null);
    }

    public ProcessContainer(URL url, Map<String, ElementHandler> map, Map<String, String> map2) throws Exception {
        this(parseDocument(url), map, map2);
    }

    public ProcessContainer(Document document, Map<String, ElementHandler> map, Map<String, String> map2) throws Exception {
        String uuid;
        this.objectFactory = ObjectFactory.newInstance();
        this.processorFactory = new ProcessorFactory(this.objectFactory);
        this.depGraph = new ComputeGraph();
        this.dependencyInjection = new DependencyInjection();
        this.name = null;
        this.f0streams = new LinkedHashMap();
        this.sinks = new LinkedHashMap();
        this.listeners = new LinkedHashMap();
        this.processes = new ArrayList();
        this.processContexts = new LinkedHashMap();
        this.worker = new ArrayList();
        this.serviceRefs = new ArrayList();
        this.elementHandler = new HashMap();
        this.documentHandler = new ArrayList();
        this.namingService = null;
        this.lifeCyleObjects = new ArrayList();
        this.server = true;
        this.startTime = 0L;
        this.containerVariables = new Variables();
        this.failFastReason = null;
        if (map2 != null) {
            this.containerVariables.addVariables(map2);
        }
        container.add(this);
        LibrariesElementHandler librariesElementHandler = new LibrariesElementHandler(this.objectFactory);
        this.documentHandler.add(new PropertiesHandler());
        this.documentHandler.add(new SystemPropertiesHandler());
        this.documentHandler.add(librariesElementHandler);
        for (String str : autoHandlers.keySet()) {
            log.debug("Adding auto-discovered handler for element '{}': {}", str, autoHandlers.get(str));
            this.elementHandler.put(str, autoHandlers.get(str));
        }
        this.elementHandler.put("Container-Ref", new ContainerRefElementHandler(this.objectFactory));
        this.elementHandler.put("Stream", new StreamElementHandler(this.objectFactory));
        this.elementHandler.put("Sink", new SinkElementHandler());
        this.elementHandler.put("Queue", new QueueElementHandler());
        this.elementHandler.put("Monitor", new MonitorElementHandler(this.objectFactory, this.processorFactory));
        this.elementHandler.put("Process", new ProcessElementHandler(this.objectFactory, this.processorFactory));
        this.elementHandler.put("Service", new ServiceElementHandler(this.objectFactory));
        this.elementHandler.put("Libs", librariesElementHandler);
        if (map != null) {
            this.elementHandler.putAll(map);
        }
        Document perform = new XIncluder().perform(document, new Variables(this.containerVariables));
        log.debug(XMLUtils.toString(perform));
        log.debug("XML created and preprocessed.");
        Element documentElement = perform.getDocumentElement();
        Map<String, String> attributes = this.objectFactory.getAttributes(documentElement);
        if (documentElement.hasAttribute("id")) {
            uuid = documentElement.getAttribute("id");
            this.name = uuid;
        } else {
            uuid = UUID.randomUUID().toString();
        }
        if (System.getProperty("container.address") != null) {
            attributes.put("address", System.getProperty("container.address"));
        }
        if (System.getProperty("container.port") != null) {
            attributes.put("port", System.getProperty("container.port"));
        }
        try {
            this.server = new Boolean(attributes.get("server")).booleanValue();
        } catch (Exception e) {
            this.server = true;
        }
        if (!documentElement.getNodeName().equalsIgnoreCase("application") && !documentElement.getNodeName().equalsIgnoreCase("container")) {
            throw new Exception("Expecting root element to be 'container'!");
        }
        String str2 = "localhost";
        if (System.getProperty("hostname") != null) {
            str2 = System.getProperty("hostname");
        } else {
            try {
                str2 = InetAddress.getLocalHost().getHostAddress();
                this.name = InetAddress.getLocalHost().getHostName();
                if (this.name.indexOf(".") > 0) {
                    this.name = this.name.substring(0, this.name.indexOf("."));
                }
            } catch (Exception e2) {
                this.name = UUID.randomUUID().toString();
            }
            log.debug("Default hostname is: {}", str2);
            if (attributes.containsKey("address") && !attributes.get("address").trim().isEmpty()) {
                str2 = InetAddress.getByName(attributes.get("address")).getHostAddress();
                log.debug("Container address will be {}", str2);
            }
        }
        Integer num = 0;
        if (attributes.containsKey("port") && !attributes.get("port").trim().isEmpty()) {
            num = new Integer(attributes.get("port"));
            log.debug("Container port will be {}", num);
        }
        if (documentElement.hasAttribute("id")) {
            this.name = documentElement.getAttribute("id");
        }
        try {
            String attribute = documentElement.getAttribute("namingService");
            if (attribute != null && !attribute.trim().isEmpty()) {
                this.namingService = (NamingService) this.objectFactory.create(attribute, attributes, ObjectFactory.createConfigDocument(documentElement));
            }
            if (this.namingService == null) {
                if (attributes.containsKey("address")) {
                    log.debug("Creating RMI naming-service...");
                    System.setProperty("java.rmi.server.hostname", str2);
                    this.namingService = new RMINamingService(this.name, str2, num.intValue(), true);
                } else {
                    log.debug("No address specified, using local naming-service. Container will not be able to reference other containers!");
                    this.namingService = new DefaultNamingService();
                }
            }
            if (this.namingService instanceof LifeCycle) {
                this.lifeCyleObjects.add((LifeCycle) this.namingService);
            }
            log.debug("Using naming-service {}", this.namingService);
            this.context = new ContainerContext(uuid + "@" + System.currentTimeMillis(), this.name, this.namingService);
            init(perform);
        } catch (Exception e3) {
            log.error("Faild to instantiate naming service '{}': {}", documentElement.getAttribute("namingService"), e3.getMessage());
            throw new Exception("Faild to instantiate naming service '" + documentElement.getAttribute("namingService") + "': " + e3.getMessage());
        }
    }

    public ComputeGraph computeGraph() {
        return this.depGraph;
    }

    public Set<Source> getStreams() {
        return new LinkedHashSet(this.f0streams.values());
    }

    public String getName() {
        return this.name;
    }

    public void setName(String str) {
        this.name = str;
    }

    public List<LifeCycle> lifeCycles() {
        return this.lifeCyleObjects;
    }

    /* renamed from: getContext, reason: merged with bridge method [inline-methods] */
    public ContainerContext m22getContext() {
        return this.context;
    }

    public List<Process> getProcesses() {
        return this.processes;
    }

    public List<ServiceReference> getServiceRefs() {
        return this.serviceRefs;
    }

    public void register(LifeCycle lifeCycle) {
        if (this.lifeCyleObjects.contains(lifeCycle)) {
            return;
        }
        log.debug("Registering new life-cycle object {}", lifeCycle);
        this.lifeCyleObjects.add(lifeCycle);
    }

    private void init(Document document) throws Exception {
        Element documentElement = document.getDocumentElement();
        if (documentElement.getAttribute("import") != null) {
            for (String str : documentElement.getAttribute("import").split(",")) {
                if (!str.trim().isEmpty()) {
                    this.objectFactory.addPackage(str.trim());
                }
            }
        }
        if (documentElement.getAttribute("name") == null) {
        }
        Iterator<DocumentHandler> it = this.documentHandler.iterator();
        while (it.hasNext()) {
            it.next().handle(this, document, this.containerVariables, this.dependencyInjection);
        }
        this.objectFactory.addVariables(this.context.getProperties());
        this.objectFactory.addVariables(this.containerVariables);
        NodeList childNodes = documentElement.getChildNodes();
        if (this.context.getProperties().get("container.datafactory") != null) {
            log.debug("Using {} as default DataFactory for this container...", this.context.getProperties().get("container.datafactory"));
            DataFactory.setDefaultDataFactory((DataFactory) Class.forName(this.context.getProperties().get("container.datafactory")).newInstance());
        }
        for (int i = 0; i < childNodes.getLength(); i++) {
            Node item = childNodes.item(i);
            if (item.getNodeType() == 1) {
                Element element = (Element) item;
                for (ElementHandler elementHandler : this.elementHandler.values()) {
                    if (elementHandler.handlesElement(element)) {
                        elementHandler.handleElement(this, element, this.containerVariables, this.dependencyInjection);
                    }
                }
            }
        }
        try {
            this.dependencyInjection.injectDependencies(this.depGraph, this.namingService);
            this.context.setProperty("xml", XMLUtils.toString(document));
            drawGraph();
            log.debug("ProcessContainer is initialized and ready to start:{}", toString());
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

    private void drawGraph() {
        String print = PrintGraph.print(computeGraph());
        if (print == null || print.trim().isEmpty()) {
            return;
        }
        run.log.info("Compute graph:\n\n{}", print);
    }

    public void registerQueue(String str, Queue queue, boolean z) throws Exception {
        log.debug("A new queue '{}' is registered for id '{}'", queue, str);
        if (z) {
            this.listeners.put(str, queue);
        }
        registerStream(str, queue);
        registerSink(str, queue);
    }

    public void registerSink(String str, Sink sink) {
        this.sinks.put(str, sink);
    }

    public void registerStream(String str, Source source) {
        this.f0streams.put(str, source);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.runtime = execute();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    public long execute() throws Exception {
        if (!container.contains(this)) {
            log.debug("Registering new container {}", this);
            container.add(this);
            Signals.register(new Hook() { // from class: stream.runtime.ProcessContainer.2
                public void signal(int i) {
                    ProcessContainer.log.info("Handling signal {}", Integer.valueOf(i));
                }
            });
        }
        this.startTime = Long.valueOf(System.currentTimeMillis());
        ContainerController containerController = new ContainerController(this);
        log.debug("Registering container-controller {}", containerController);
        this.namingService.register(".ctrl", containerController);
        Map services = computeGraph().services();
        for (String str : services.keySet()) {
            log.info("Registering service '{}' => {}", str, services.get(str));
            this.namingService.register(str, (Service) services.get(str));
        }
        if (!this.server && this.f0streams.isEmpty() && this.listeners.isEmpty()) {
            throw new Exception("No data-stream defined!");
        }
        log.debug("Need to handle {} sources: {}", Integer.valueOf(this.f0streams.size()), this.f0streams.keySet());
        log.debug("Experiment contains {} stream processes", Integer.valueOf(this.processes.size()));
        log.debug("Initializing all DataStreams...");
        if (this.f0streams.keySet().isEmpty()) {
            log.debug("No dataStreams to initialize");
        }
        for (String str2 : this.f0streams.keySet()) {
            Source source = this.f0streams.get(str2);
            log.debug("Initializing stream '{}'", str2);
            source.init();
        }
        log.debug("Initializing all Sinks...");
        for (String str3 : this.sinks.keySet()) {
            Sink sink = this.sinks.get(str3);
            log.debug("Initializing sink '{}'", str3);
            sink.init();
        }
        for (LifeCycle lifeCycle : this.lifeCyleObjects) {
            log.info("Initializing life-cycle for {}", lifeCycle);
            lifeCycle.init(this.context);
        }
        Supervisor supervisor = new Supervisor(computeGraph()) { // from class: stream.runtime.ProcessContainer.3
            @Override // streams.runtime.Supervisor, stream.runtime.ProcessListener
            public void processError(Process process, Exception exc) {
                synchronized (this) {
                    super.processError(process, exc);
                    ProcessContainer.log.error("Process {} signaled an error: {}", process, exc.getMessage());
                    ProcessContainer.log.debug("Forcing fail-fast shutdown of application...");
                    ProcessContainer.this.failFastReason = exc;
                    ProcessContainer.this.shutdown();
                    notifyAll();
                }
            }
        };
        this.supervisor = supervisor;
        log.debug("Creating {} active processes...", Integer.valueOf(this.processes.size()));
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        for (Process process : this.processes) {
            if (this.processContexts.get(process) == null) {
                this.processContexts.put(process, new ProcessContextImpl("process:" + process.hashCode(), this.context));
            }
            final ProcessThread processThread = new ProcessThread(process, this.context);
            processThread.addListener(supervisor);
            log.debug("Initializing stream-process [{}]", process);
            processThread.init();
            log.debug("Starting stream-process [{}]", process);
            processThread.start();
            Signals.register(new Hook() { // from class: stream.runtime.ProcessContainer.4
                public void signal(int i2) {
                    processThread.signal(i2);
                }
            });
            log.debug("Stream-process started.");
            i++;
        }
        if (this.failFastReason != null) {
            log.error("Exception occurred: {}", this.failFastReason.getMessage());
            throw this.failFastReason;
        }
        log.debug("{} processes started...", Integer.valueOf(i));
        while (supervisor.processesDone() < i) {
            log.debug("Waiting for processes to finish...");
            supervisor.waitForProcesses();
            log.debug("{} of {} processes finished...", Integer.valueOf(supervisor.processesDone()), Integer.valueOf(i));
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        log.trace("Running processes: {}", this.processes);
        log.debug("ProcessContainer finished all processes after {} ms", Long.valueOf(currentTimeMillis2 - currentTimeMillis));
        synchronized (this) {
            log.debug("Waiting for error handlers to finish");
            wait(1000L);
        }
        if (this.failFastReason != null) {
            throw this.failFastReason;
        }
        return currentTimeMillis2 - currentTimeMillis;
    }

    public Variables getVariables() {
        return this.containerVariables;
    }

    public Set<String> getStreamListenerNames() {
        return this.listeners.keySet();
    }

    public ObjectFactory getObjectFactory() {
        return this.objectFactory;
    }

    public void dataArrived(String str, Data data) {
        if (!this.listeners.containsKey(str)) {
            log.warn("No listener defined for {}", str);
            return;
        }
        log.debug("Adding item {} into queue {}", data, str);
        try {
            this.listeners.get(str).write(data);
        } catch (Exception e) {
            log.error("Failed to inject arriving data item into queue {}: {}", str, e.getMessage());
        }
    }

    public void shutdown() {
        log.debug("shutdown()");
        if (this.supervisor == null) {
            log.info("No supervisor installed; no clean-up!");
            return;
        }
        log.debug("Calling supervisor.signal(0)!");
        this.supervisor.signal(0);
        for (ProcessThread processThread : this.worker) {
            try {
                log.info("Sending signal to {}", processThread);
                processThread.signal(0);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        log.debug("Waiting for remaining processes...");
        while (this.supervisor.processesRunning() > 0) {
            try {
                log.debug("    waiting for {} process(es) to finish...", Integer.valueOf(this.supervisor.processesRunning()));
                Thread.sleep(250L);
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }

    public void setProcessContext(Process process, ProcessContext processContext) {
        this.processContexts.put(process, processContext);
    }

    public NamingService getNamingService() {
        return this.namingService;
    }

    static {
        log.debug("Adding container shutdown-hook");
        Thread thread = new Thread() { // from class: stream.runtime.ProcessContainer.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                ProcessContainer.log.debug("Executing shutdown-hook...");
                if ("disabled".equalsIgnoreCase(System.getProperty("container.shutdown-hook"))) {
                    ProcessContainer.log.warn("Shutdown-hook disabled...");
                    return;
                }
                ProcessContainer.log.debug("Running shutdown-hook...");
                for (ProcessContainer processContainer : ProcessContainer.container) {
                    ProcessContainer.log.debug("Sending shutdown signal to {}", processContainer);
                    processContainer.shutdown();
                }
            }
        };
        thread.setDaemon(true);
        Runtime.getRuntime().addShutdownHook(thread);
        extensions = new String[]{"stream.moa.MoaObjectFactory", "stream.script.JavaScriptProcessorFactory"};
        autoHandlers = new LinkedHashMap();
        for (String str : extensions) {
            try {
                ObjectFactory.registerObjectCreator((ObjectCreator) Class.forName(str).newInstance());
                log.debug("Registered extension {}", str);
            } catch (Exception e) {
                log.debug("Failed to register extension '{}': {}", str, e.getMessage());
                if (log.isTraceEnabled()) {
                    e.printStackTrace();
                }
            }
        }
        for (String str2 : new String[]{"streams.esper.EsperEngineElementHandler"}) {
            try {
                ElementHandler elementHandler = (ElementHandler) Class.forName(str2).newInstance();
                autoHandlers.put(elementHandler.getKey(), elementHandler);
            } catch (Exception e2) {
                log.debug("Failed to register handler {}", str2);
                if (log.isTraceEnabled()) {
                    e2.printStackTrace();
                }
            }
        }
    }
}
