package org.apache.kafka.trogdor.agent;

import com.fasterxml.jackson.databind.node.LongNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.PrintStream;
import java.util.Set;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/trogdor/agent/Agent.class */
public final class Agent {
    private static final Logger log = LoggerFactory.getLogger(Agent.class);
    public static final int DEFAULT_PORT = 8888;
    private static final long EXEC_WORKER_ID = 1;
    private static final String EXEC_TASK_ID = "task0";
    private final Platform platform;
    private final long serverStartMs;
    private final WorkerManager workerManager;
    private final JsonRestServer restServer;
    private final Time time;

    public Agent(Platform platform, Scheduler scheduler, JsonRestServer jsonRestServer, AgentRestResource agentRestResource) {
        this.platform = platform;
        this.time = scheduler.time();
        this.serverStartMs = this.time.milliseconds();
        this.workerManager = new WorkerManager(platform, scheduler);
        this.restServer = jsonRestServer;
        agentRestResource.setAgent(this);
    }

    public int port() {
        return this.restServer.port();
    }

    public void beginShutdown() throws Exception {
        this.restServer.beginShutdown();
        this.workerManager.beginShutdown();
    }

    public void waitForShutdown() throws Exception {
        this.restServer.waitForShutdown();
        this.workerManager.waitForShutdown();
    }

    public AgentStatusResponse status() throws Exception {
        return new AgentStatusResponse(this.serverStartMs, this.workerManager.workerStates());
    }

    public UptimeResponse uptime() {
        return new UptimeResponse(this.serverStartMs, this.time.milliseconds());
    }

    public void createWorker(CreateWorkerRequest createWorkerRequest) throws Throwable {
        this.workerManager.createWorker(createWorkerRequest.workerId(), createWorkerRequest.taskId(), createWorkerRequest.spec());
    }

    public void stopWorker(StopWorkerRequest stopWorkerRequest) throws Throwable {
        this.workerManager.stopWorker(stopWorkerRequest.workerId(), false);
    }

    public void destroyWorker(DestroyWorkerRequest destroyWorkerRequest) throws Throwable {
        this.workerManager.stopWorker(destroyWorkerRequest.workerId(), true);
    }

    TaskSpec rebaseTaskSpecTime(TaskSpec taskSpec) throws Exception {
        ObjectNode objectNode = (ObjectNode) JsonUtil.JSON_SERDE.valueToTree(taskSpec);
        objectNode.set("startMs", new LongNode(Math.max(this.time.milliseconds(), taskSpec.startMs())));
        return (TaskSpec) JsonUtil.JSON_SERDE.treeToValue(objectNode, TaskSpec.class);
    }

    boolean exec(TaskSpec taskSpec, PrintStream printStream) throws Exception {
        try {
            Set<String> targetNodes = taskSpec.newController(EXEC_TASK_ID).targetNodes(this.platform.topology());
            if (!targetNodes.contains(this.platform.curNode().name())) {
                printStream.println("This task is not configured to run on this node.  It runs on node(s): " + Utils.join(targetNodes, ", ") + ", whereas this node is " + this.platform.curNode().name());
                return false;
            }
            try {
                KafkaFuture<String> createWorker = this.workerManager.createWorker(EXEC_WORKER_ID, EXEC_TASK_ID, taskSpec);
                printStream.println("Waiting for completion of task:" + JsonUtil.toPrettyJsonString(taskSpec));
                String str = createWorker.get();
                if (str == null || str.isEmpty()) {
                    printStream.println("Task succeeded with status " + JsonUtil.toPrettyJsonString(this.workerManager.workerStates().get(Long.valueOf(EXEC_WORKER_ID)).status()));
                    return true;
                }
                printStream.println("Task failed with status " + JsonUtil.toPrettyJsonString(this.workerManager.workerStates().get(Long.valueOf(EXEC_WORKER_ID)).status()) + " and error " + str);
                return false;
            } catch (Throwable th) {
                printStream.println("createWorker failed");
                th.printStackTrace(printStream);
                return false;
            }
        } catch (Exception e) {
            printStream.println("Unable to create the task controller.");
            e.printStackTrace(printStream);
            return false;
        }
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentParser description = ArgumentParsers.newArgumentParser("trogdor-agent").defaultHelp(true).description("The Trogdor fault injection agent");
        description.addArgument("--agent.config", "-c").action(Arguments.store()).required(true).type(String.class).dest("config").metavar("CONFIG").help("The configuration file to use.");
        description.addArgument("--node-name", "-n").action(Arguments.store()).required(true).type(String.class).dest("node_name").metavar("NODE_NAME").help("The name of this node.");
        description.addArgument("--exec", "-e").action(Arguments.store()).type(String.class).dest("task_spec").metavar("TASK_SPEC").help("Execute a single task spec and then exit.  The argument is the task spec to load when starting up, or a path to it.");
        Namespace namespace = null;
        try {
            namespace = description.parseArgs(strArr);
        } catch (ArgumentParserException e) {
            if (strArr.length == 0) {
                description.printHelp();
                Exit.exit(0);
            } else {
                description.handleError(e);
                Exit.exit(1);
            }
        }
        String string = namespace.getString("config");
        String string2 = namespace.getString("node_name");
        String string3 = namespace.getString("task_spec");
        Platform parse = Platform.Config.parse(string2, string);
        JsonRestServer jsonRestServer = new JsonRestServer(Node.Util.getTrogdorAgentPort(parse.curNode()));
        AgentRestResource agentRestResource = new AgentRestResource();
        log.info("Starting agent process.");
        Agent agent = new Agent(parse, Scheduler.SYSTEM, jsonRestServer, agentRestResource);
        jsonRestServer.start(agentRestResource);
        Exit.addShutdownHook("agent-shutdown-hook", () -> {
            log.warn("Running agent shutdown hook.");
            try {
                agent.beginShutdown();
                agent.waitForShutdown();
            } catch (Exception e2) {
                log.error("Got exception while running agent shutdown hook.", e2);
            }
        });
        if (string3 != null) {
            TaskSpec taskSpec = null;
            try {
                taskSpec = (TaskSpec) JsonUtil.objectFromCommandLineArgument(string3, TaskSpec.class);
            } catch (Exception e2) {
                System.out.println("Unable to parse the supplied task spec.");
                e2.printStackTrace();
                Exit.exit(1);
            }
            Exit.exit(agent.exec(agent.rebaseTaskSpecTime(taskSpec), System.out) ? 0 : 1);
        }
        agent.waitForShutdown();
    }
}
