package com.twitter.heron.simulator;

import com.twitter.heron.api.Config;
import com.twitter.heron.api.HeronTopology;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.common.basics.SingletonRegistry;
import com.twitter.heron.common.config.SystemConfig;
import com.twitter.heron.proto.system.PhysicalPlans;
import com.twitter.heron.simulator.executors.InstanceExecutor;
import com.twitter.heron.simulator.executors.MetricsExecutor;
import com.twitter.heron.simulator.executors.StreamExecutor;
import com.twitter.heron.simulator.utils.PhysicalPlanUtil;
import java.lang.Thread;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/twitter/heron/simulator/Simulator.class */
public class Simulator {
    private static final Logger LOG = Logger.getLogger(Simulator.class.getName());
    private final List<InstanceExecutor> instanceExecutors;
    private final ExecutorService threadsPool;
    private SystemConfig systemConfig;
    private StreamExecutor streamExecutor;
    private MetricsExecutor metricsExecutor;

    /* loaded from: input_file:com/twitter/heron/simulator/Simulator$DefaultExceptionHandler.class */
    public class DefaultExceptionHandler implements Thread.UncaughtExceptionHandler {
        public DefaultExceptionHandler() {
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            Simulator.LOG.severe("Local Mode Process exiting.");
            Simulator.LOG.log(Level.SEVERE, "Exception caught in thread: " + thread.getName() + " with id: " + thread.getId(), th);
            for (Handler handler : Logger.getLogger("").getHandlers()) {
                handler.close();
            }
            Simulator.this.threadsPool.shutdownNow();
            Runtime.getRuntime().halt(1);
        }
    }

    public Simulator() {
        this(true);
    }

    public Simulator(boolean z) {
        this.instanceExecutors = new LinkedList();
        this.threadsPool = Executors.newCachedThreadPool();
        if (z) {
            init();
        }
    }

    protected void init() {
        this.systemConfig = getSystemConfig();
        synchronized (SingletonRegistry.INSTANCE) {
            if (isSystemConfigExisted()) {
                LOG.info("System config already existed.");
            } else {
                LOG.info("System config not existed. Registering...");
                registerSystemConfig(this.systemConfig);
                LOG.info("System config registered.");
            }
        }
    }

    protected boolean isSystemConfigExisted() {
        return SingletonRegistry.INSTANCE.containsSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
    }

    protected void registerSystemConfig(SystemConfig systemConfig) {
        SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, systemConfig);
    }

    public void submitTopology(String str, Config config, HeronTopology heronTopology) {
        PhysicalPlans.PhysicalPlan physicalPlan = PhysicalPlanUtil.getPhysicalPlan(heronTopology.setConfig(config).setName(str).setState(TopologyAPI.TopologyState.RUNNING).getTopology());
        LOG.info("Physical Plan: \n" + physicalPlan);
        this.streamExecutor = new StreamExecutor(physicalPlan);
        this.metricsExecutor = new MetricsExecutor(this.systemConfig);
        Iterator<PhysicalPlans.Instance> it = physicalPlan.getInstancesList().iterator();
        while (it.hasNext()) {
            InstanceExecutor instanceExecutor = new InstanceExecutor(physicalPlan, it.next().getInstanceId());
            this.streamExecutor.addInstanceExecutor(instanceExecutor);
            this.metricsExecutor.addInstanceExecutor(instanceExecutor);
            this.instanceExecutors.add(instanceExecutor);
        }
        Thread.setDefaultUncaughtExceptionHandler(new DefaultExceptionHandler());
        this.threadsPool.execute(this.metricsExecutor);
        this.threadsPool.execute(this.streamExecutor);
        Iterator<InstanceExecutor> it2 = this.instanceExecutors.iterator();
        while (it2.hasNext()) {
            this.threadsPool.execute(it2.next());
        }
    }

    public void killTopology(String str) {
        LOG.info("To kill topology: " + str);
        stop();
        LOG.info("Topology killed successfully");
    }

    public void activate(String str) {
        LOG.info("To activate topology: " + str);
        Iterator<InstanceExecutor> it = this.instanceExecutors.iterator();
        while (it.hasNext()) {
            it.next().activate();
        }
        LOG.info("Activated topology: " + str);
    }

    public void deactivate(String str) {
        LOG.info("To deactivate topology: " + str);
        Iterator<InstanceExecutor> it = this.instanceExecutors.iterator();
        while (it.hasNext()) {
            it.next().deactivate();
        }
        LOG.info("Deactivated topology:" + str);
    }

    public void shutdown() {
        LOG.info("To shutdown thread pool");
        if (this.threadsPool.isShutdown()) {
            this.threadsPool.shutdownNow();
        }
        LOG.info("Heron simulator exited.");
    }

    public void stop() {
        Iterator<InstanceExecutor> it = this.instanceExecutors.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        LOG.info("To stop Stream Executor");
        this.streamExecutor.stop();
        LOG.info("To stop Metrics Executor");
        this.metricsExecutor.stop();
        this.threadsPool.shutdown();
    }

    protected SystemConfig getSystemConfig() {
        SystemConfig systemConfig = new SystemConfig();
        systemConfig.put(SystemConfig.INSTANCE_SET_DATA_TUPLE_CAPACITY, 256);
        systemConfig.put(SystemConfig.INSTANCE_SET_CONTROL_TUPLE_CAPACITY, 256);
        systemConfig.put(SystemConfig.HERON_METRICS_EXPORT_INTERVAL_SEC, 60);
        systemConfig.put(SystemConfig.INSTANCE_EXECUTE_BATCH_TIME_MS, 16);
        systemConfig.put(SystemConfig.INSTANCE_EXECUTE_BATCH_SIZE_BYTES, 32768);
        systemConfig.put(SystemConfig.INSTANCE_EMIT_BATCH_TIME_MS, 16);
        systemConfig.put(SystemConfig.INSTANCE_EMIT_BATCH_SIZE_BYTES, 32768);
        systemConfig.put(SystemConfig.INSTANCE_ACK_BATCH_TIME_MS, 128);
        systemConfig.put(SystemConfig.INSTANCE_ACKNOWLEDGEMENT_NBUCKETS, 10);
        return systemConfig;
    }
}
