package org.apache.kafka.trogdor.coordinator;

import java.net.ConnectException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerReceiving;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.rest.WorkerStarting;
import org.apache.kafka.trogdor.rest.WorkerState;
import org.apache.kafka.trogdor.rest.WorkerStopping;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/coordinator/NodeManager.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.9.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/coordinator/NodeManager.class */
public final class NodeManager {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) NodeManager.class);
    private static final long HEARTBEAT_DELAY_MS = 1000;
    private final Node node;
    private final TaskManager taskManager;
    private final AgentClient client;
    private final ScheduledExecutorService executor;
    private ScheduledFuture<?> heartbeatFuture;
    private final Map<Long, ManagedWorker> workers = new HashMap();
    private final NodeHeartbeat heartbeat = new NodeHeartbeat();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/coordinator/NodeManager$CreateWorker.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.9.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/coordinator/NodeManager$CreateWorker.class */
    public class CreateWorker implements Callable<Void> {
        private final long workerId;
        private final String taskId;
        private final TaskSpec spec;

        CreateWorker(long j, String str, TaskSpec taskSpec) {
            this.workerId = j;
            this.taskId = str;
            this.spec = taskSpec;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            ManagedWorker managedWorker = (ManagedWorker) NodeManager.this.workers.get(Long.valueOf(this.workerId));
            if (managedWorker != null) {
                NodeManager.log.error("{}: there is already a worker {} with ID {}.", NodeManager.this.node.name(), managedWorker, Long.valueOf(this.workerId));
                return null;
            }
            ManagedWorker managedWorker2 = new ManagedWorker(this.workerId, this.taskId, this.spec, true, new WorkerReceiving(this.taskId, this.spec));
            NodeManager.log.info("{}: scheduling worker {} to start.", NodeManager.this.node.name(), managedWorker2);
            NodeManager.this.workers.put(Long.valueOf(this.workerId), managedWorker2);
            NodeManager.this.rescheduleNextHeartbeat(0L);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/coordinator/NodeManager$DestroyWorker.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.9.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/coordinator/NodeManager$DestroyWorker.class */
    public class DestroyWorker implements Callable<Void> {
        private final long workerId;

        DestroyWorker(long j) {
            this.workerId = j;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            if (((ManagedWorker) NodeManager.this.workers.remove(Long.valueOf(this.workerId))) == null) {
                NodeManager.log.error("{}: unable to locate worker to destroy with ID {}.", NodeManager.this.node.name(), Long.valueOf(this.workerId));
                return null;
            }
            NodeManager.this.rescheduleNextHeartbeat(0L);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/coordinator/NodeManager$ManagedWorker.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.9.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/coordinator/NodeManager$ManagedWorker.class */
    public class ManagedWorker {
        private final long workerId;
        private final String taskId;
        private final TaskSpec spec;
        private boolean shouldRun;
        private WorkerState state;

        ManagedWorker(long j, String str, TaskSpec taskSpec, boolean z, WorkerState workerState) {
            this.workerId = j;
            this.taskId = str;
            this.spec = taskSpec;
            this.shouldRun = z;
            this.state = workerState;
        }

        void tryCreate() {
            try {
                NodeManager.this.client.createWorker(new CreateWorkerRequest(this.workerId, this.taskId, this.spec));
            } catch (Throwable th) {
                NodeManager.log.error("{}: error creating worker {}.", NodeManager.this.node.name(), this, th);
            }
        }

        void tryStop() {
            try {
                NodeManager.this.client.stopWorker(new StopWorkerRequest(this.workerId));
            } catch (Throwable th) {
                NodeManager.log.error("{}: error stopping worker {}.", NodeManager.this.node.name(), this, th);
            }
        }

        public String toString() {
            return String.format("%s_%d", this.taskId, Long.valueOf(this.workerId));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/coordinator/NodeManager$NodeHeartbeat.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.9.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/coordinator/NodeManager$NodeHeartbeat.class */
    public class NodeHeartbeat implements Runnable {
        NodeHeartbeat() {
        }

        @Override // java.lang.Runnable
        public void run() {
            NodeManager.this.rescheduleNextHeartbeat(1000L);
            try {
                try {
                    try {
                        AgentStatusResponse status = NodeManager.this.client.status();
                        if (NodeManager.log.isTraceEnabled()) {
                            NodeManager.log.trace("{}: got heartbeat status {}", NodeManager.this.node.name(), status);
                        }
                        handleMissingWorkers(status);
                        handlePresentWorkers(status);
                    } catch (ConnectException e) {
                        NodeManager.log.error("{}: failed to get agent status: ConnectException {}", NodeManager.this.node.name(), e.getMessage());
                    }
                } catch (Exception e2) {
                    NodeManager.log.error("{}: failed to get agent status", NodeManager.this.node.name(), e2);
                }
            } catch (Throwable th) {
                NodeManager.log.error("{}: Unhandled exception in NodeHeartbeatRunnable", NodeManager.this.node.name(), th);
            }
        }

        private void handleMissingWorkers(AgentStatusResponse agentStatusResponse) {
            for (Map.Entry entry : NodeManager.this.workers.entrySet()) {
                if (!agentStatusResponse.workers().containsKey((Long) entry.getKey())) {
                    ManagedWorker managedWorker = (ManagedWorker) entry.getValue();
                    if (managedWorker.shouldRun) {
                        managedWorker.tryCreate();
                    }
                }
            }
        }

        private void handlePresentWorkers(AgentStatusResponse agentStatusResponse) {
            for (Map.Entry<Long, WorkerState> entry : agentStatusResponse.workers().entrySet()) {
                long longValue = entry.getKey().longValue();
                WorkerState value = entry.getValue();
                ManagedWorker managedWorker = (ManagedWorker) NodeManager.this.workers.get(Long.valueOf(longValue));
                if (managedWorker == null) {
                    NodeManager.log.warn("{}: scheduling unknown worker with ID {} for stopping.", NodeManager.this.node.name(), Long.valueOf(longValue));
                    NodeManager.this.workers.put(Long.valueOf(longValue), new ManagedWorker(longValue, value.taskId(), value.spec(), false, value));
                } else {
                    if (((value instanceof WorkerStarting) || (value instanceof WorkerRunning)) && !managedWorker.shouldRun) {
                        managedWorker.tryStop();
                    }
                    if (managedWorker.state.equals(value)) {
                        NodeManager.log.debug("{}: worker state is still {}", NodeManager.this.node.name(), managedWorker.state);
                    } else {
                        NodeManager.log.info("{}: worker state changed from {} to {}", NodeManager.this.node.name(), managedWorker.state, value);
                        if ((value instanceof WorkerDone) || (value instanceof WorkerStopping)) {
                            managedWorker.shouldRun = false;
                        }
                        managedWorker.state = value;
                        NodeManager.this.taskManager.updateWorkerState(NodeManager.this.node.name(), managedWorker.workerId, value);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/coordinator/NodeManager$StopWorker.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.9.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/coordinator/NodeManager$StopWorker.class */
    public class StopWorker implements Callable<Void> {
        private final long workerId;

        StopWorker(long j) {
            this.workerId = j;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            ManagedWorker managedWorker = (ManagedWorker) NodeManager.this.workers.get(Long.valueOf(this.workerId));
            if (managedWorker == null) {
                NodeManager.log.error("{}: unable to locate worker to stop with ID {}.", NodeManager.this.node.name(), Long.valueOf(this.workerId));
                return null;
            }
            if (!managedWorker.shouldRun) {
                NodeManager.log.error("{}: Worker {} is already scheduled to stop.", NodeManager.this.node.name(), managedWorker);
                return null;
            }
            NodeManager.log.info("{}: scheduling worker {} to stop.", NodeManager.this.node.name(), managedWorker);
            managedWorker.shouldRun = false;
            NodeManager.this.rescheduleNextHeartbeat(0L);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NodeManager(Node node, TaskManager taskManager) {
        this.node = node;
        this.taskManager = taskManager;
        this.client = new AgentClient.Builder().maxTries(1).target(node.hostname(), Node.Util.getTrogdorAgentPort(node)).build();
        this.executor = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("NodeManager(" + node.name() + DefaultExpressionEngine.DEFAULT_INDEX_END, false));
        rescheduleNextHeartbeat(1000L);
    }

    void rescheduleNextHeartbeat(long j) {
        if (this.heartbeatFuture != null) {
            this.heartbeatFuture.cancel(false);
        }
        this.heartbeatFuture = this.executor.scheduleAtFixedRate(this.heartbeat, j, 1000L, TimeUnit.MILLISECONDS);
    }

    public void createWorker(long j, String str, TaskSpec taskSpec) {
        this.executor.submit(new CreateWorker(j, str, taskSpec));
    }

    public void stopWorker(long j) {
        this.executor.submit(new StopWorker(j));
    }

    public void destroyWorker(long j) {
        this.executor.submit(new DestroyWorker(j));
    }

    public void beginShutdown(boolean z) {
        this.executor.shutdownNow();
        if (z) {
            try {
                this.client.invokeShutdown();
            } catch (Exception e) {
                log.error("{}: Failed to send shutdown request", this.node.name(), e);
            }
        }
    }

    public void waitForShutdown() throws InterruptedException {
        this.executor.awaitTermination(1L, TimeUnit.DAYS);
    }
}
