package streams.net;

import java.io.DataInputStream;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLServerSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.util.MultiSet;
import streams.codec.Codec;
import streams.codec.DefaultCodec;
import streams.io.BobCodec;
import streams.performance.PerformanceTree;
import streams.performance.ProcessorStatistics;

/* loaded from: input_file:streams/net/PerformanceReceiver.class */
public class PerformanceReceiver extends Thread {
    final ServerSocket server;
    static Logger log = LoggerFactory.getLogger(PerformanceReceiver.class);
    static Map<String, PerformanceTree> performanceTrees = new LinkedHashMap();
    static MultiSet<String> updateCount = new MultiSet<>();
    static LinkedBlockingQueue<Update> updates = new LinkedBlockingQueue<>();
    static int port = 6001;

    /* loaded from: input_file:streams/net/PerformanceReceiver$Dump.class */
    public static class Dump extends Thread {
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            System.out.println("\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n");
            for (String str : PerformanceReceiver.performanceTrees.keySet()) {
                PerformanceTree performanceTree = PerformanceReceiver.performanceTrees.get(str);
                System.out.println("------------------------ Application " + str + " ------------------------");
                performanceTree.print();
                System.out.println("------------------------ -------------- ------------------------");
            }
        }
    }

    /* loaded from: input_file:streams/net/PerformanceReceiver$Receiver.class */
    public static class Receiver extends Thread {
        final Socket socket;
        final Codec<Data> codec = new DefaultCodec();
        final PerformanceReceiver parent;

        public Receiver(Socket socket, PerformanceReceiver performanceReceiver) {
            this.socket = socket;
            this.parent = performanceReceiver;
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Object obj;
            while (true) {
                try {
                    byte[] readBlock = BobCodec.readBlock(new DataInputStream(this.socket.getInputStream()));
                    if (readBlock == null) {
                        PerformanceReceiver.log.debug("Received null block - exiting receiver...");
                        new Dump().run();
                        return;
                    }
                    Data data = (Data) this.codec.decode(readBlock);
                    Serializable serializable = (Serializable) data.get("performance.id");
                    if (serializable != null && (obj = (Serializable) data.get("processors")) != null && obj.getClass().isArray() && obj.getClass().getComponentType() == ProcessorStatistics.class) {
                        ProcessorStatistics[] processorStatisticsArr = (ProcessorStatistics[]) obj;
                        Serializable serializable2 = (Serializable) data.get("performance.stats");
                        if (serializable2 != null && (serializable2 instanceof ProcessorStatistics)) {
                            PerformanceReceiver.updates.add(new Update(serializable.toString(), (ProcessorStatistics) serializable2));
                            for (int i = 0; i < processorStatisticsArr.length; i++) {
                                PerformanceReceiver.updates.add(new Update(serializable.toString() + "/processor:" + i + ":" + processorStatisticsArr[i].className, processorStatisticsArr[i]));
                            }
                        }
                    }
                } catch (Exception e) {
                    PerformanceReceiver.log.error("Receiver thread has been stopped or was interrupted by some exception:" + e);
                    return;
                }
            }
        }

        public void report(ProcessorStatistics processorStatistics, ProcessorStatistics[] processorStatisticsArr) {
            PerformanceReceiver.log.info("+------------------------- Performance Report ------------------------------");
            PerformanceReceiver.log.info("|");
            PerformanceReceiver.log.info("| Performance recorded based on {} events processed in {} ms", Long.valueOf(processorStatistics.itemsProcessed()), Long.valueOf(processorStatistics.end() - processorStatistics.start()));
            PerformanceReceiver.log.info("|");
            PerformanceReceiver.log.info("| The following {} processes have been measured:", Integer.valueOf(processorStatisticsArr.length));
            PerformanceReceiver.log.info("|");
            for (int i = 0; i < processorStatisticsArr.length; i++) {
                PerformanceReceiver.log.info("|     [{}]  {}", Integer.valueOf(i), processorStatisticsArr[i]);
            }
            PerformanceReceiver.log.info("|");
            PerformanceReceiver.log.info("| streams.performance.Performance statistics:");
            PerformanceReceiver.log.info("|    {}", processorStatistics);
            PerformanceReceiver.log.info("+---------------------------------------------------------------------------");
        }
    }

    /* loaded from: input_file:streams/net/PerformanceReceiver$Update.class */
    public static class Update {
        final String path;
        final ProcessorStatistics stats;

        public Update(String str, ProcessorStatistics processorStatistics) {
            this.path = str;
            this.stats = processorStatistics;
        }
    }

    /* loaded from: input_file:streams/net/PerformanceReceiver$Updater.class */
    public static class Updater extends Thread {
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Update take = PerformanceReceiver.updates.take();
                    String[] split = take.path.split("/");
                    String str = split[0];
                    PerformanceTree performanceTree = PerformanceReceiver.performanceTrees.get(str);
                    if (performanceTree == null) {
                        performanceTree = new PerformanceTree("", null);
                        PerformanceReceiver.performanceTrees.put(str, performanceTree);
                        PerformanceReceiver.log.debug("Creating new performance tree for application '{}'", str);
                    }
                    performanceTree.update(split, take.stats);
                    PerformanceReceiver.updateCount.add(str);
                    if (PerformanceReceiver.updateCount.count(str) % 10 == 0) {
                        performanceTree.print();
                    }
                } catch (Exception e) {
                    PerformanceReceiver.log.error("Updater thread has been stopped or was interrupted by some exception:" + e);
                }
            }
        }
    }

    public PerformanceReceiver(int i) throws Exception {
        SSLServerSocket openServer = SecureConnect.openServer(i);
        openServer.setWantClientAuth(true);
        this.server = openServer;
        setDaemon(true);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            Updater updater = new Updater();
            updater.setDaemon(true);
            updater.start();
            while (true) {
                Socket accept = this.server.accept();
                log.info("client connection from {}", accept);
                new Receiver(accept, this).start();
            }
        } catch (Exception e) {
            log.error("Performance thread has been stopped or was interrupted by some exception:" + e);
        }
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr.length == 1) {
            try {
                port = Integer.parseInt(strArr[0]);
            } catch (Exception e) {
                log.error("You can only define port number as a parameter.  Using default: 6001." + strArr[0]);
            }
        }
        Runtime.getRuntime().addShutdownHook(new Dump());
        PerformanceReceiver performanceReceiver = new PerformanceReceiver(port);
        log.info("Starting performance-receiver on port {}", Integer.valueOf(performanceReceiver.server.getLocalPort()));
        performanceReceiver.run();
    }
}
