blob: 97ad4ae1506f311e513f6ceb4e595ce92cc392fc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* So, when a task comes in, it happens via createTask (the RPC backend).
* This starts a CreateTask on the main state change thread, and waits for it.
* That task checks the main task hash map, and returns back the existing task spec
* if there is something there. If there is nothing there, it creates
* something new, and returns null.
* It also schedules a RunTask some time in the future on the main state change thread.
* We save the future from this in case we need to cancel it later, in a StopTask.
* If we can't create the TaskController for the task, we transition to DONE with an
* appropriate error message.
*
* RunTask actually starts the task which was created earlier. This could
* happen an arbitrary amount of time after task creation (it is based on the
* task spec). RunTask must operate only on PENDING tasks... if the task has been
* stopped, then we have nothing to do here.
* RunTask asks the TaskController for a list of all the names of nodes
* affected by this task.
* If this list contains nodes we don't know about, or zero nodes, we
* transition directly to DONE state with an appropriate error set.
* RunTask schedules CreateWorker Callables on all the affected worker nodes.
* These callables run in the context of the relevant NodeManager.
*
* CreateWorker calls the RPC of the same name for the agent.
* There is some complexity here due to retries.
*/
package org.apache.kafka.trogdor.coordinator;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerReceiving;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.rest.WorkerStarting;
import org.apache.kafka.trogdor.rest.WorkerState;
import org.apache.kafka.trogdor.rest.WorkerStopping;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.ConnectException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* The NodeManager handles communicating with a specific agent node.
* Each NodeManager has its own ExecutorService which runs in a dedicated thread.
*/
public final class NodeManager {
private static final Logger log = LoggerFactory.getLogger(NodeManager.class);
/**
* The normal amount of seconds between heartbeats sent to the agent.
*/
private static final long HEARTBEAT_DELAY_MS = 1000L;
class ManagedWorker {
private final long workerId;
private final String taskId;
private final TaskSpec spec;
private boolean shouldRun;
private WorkerState state;
ManagedWorker(long workerId, String taskId, TaskSpec spec,
boolean shouldRun, WorkerState state) {
this.workerId = workerId;
this.taskId = taskId;
this.spec = spec;
this.shouldRun = shouldRun;
this.state = state;
}
void tryCreate() {
try {
client.createWorker(new CreateWorkerRequest(workerId, taskId, spec));
} catch (Throwable e) {
log.error("{}: error creating worker {}.", node.name(), this, e);
}
}
void tryStop() {
try {
client.stopWorker(new StopWorkerRequest(workerId));
} catch (Throwable e) {
log.error("{}: error stopping worker {}.", node.name(), this, e);
}
}
@Override
public String toString() {
return String.format("%s_%d", taskId, workerId);
}
}
/**
* The node which we are managing.
*/
private final Node node;
/**
* The task manager.
*/
private final TaskManager taskManager;
/**
* A client for the Node's Agent.
*/
private final AgentClient client;
/**
* Maps task IDs to worker structures.
*/
private final Map<Long, ManagedWorker> workers;
/**
* An executor service which manages the thread dedicated to this node.
*/
private final ScheduledExecutorService executor;
/**
* The heartbeat runnable.
*/
private final NodeHeartbeat heartbeat;
/**
* A future which can be used to cancel the periodic hearbeat task.
*/
private ScheduledFuture<?> heartbeatFuture;
NodeManager(Node node, TaskManager taskManager) {
this.node = node;
this.taskManager = taskManager;
this.client = new AgentClient.Builder().
maxTries(1).
target(node.hostname(), Node.Util.getTrogdorAgentPort(node)).
build();
this.workers = new HashMap<>();
this.executor = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("NodeManager(" + node.name() + ")",
false));
this.heartbeat = new NodeHeartbeat();
rescheduleNextHeartbeat(HEARTBEAT_DELAY_MS);
}
/**
* Reschedule the heartbeat runnable.
*
* @param initialDelayMs The initial delay to use.
*/
void rescheduleNextHeartbeat(long initialDelayMs) {
if (this.heartbeatFuture != null) {
this.heartbeatFuture.cancel(false);
}
this.heartbeatFuture = this.executor.scheduleAtFixedRate(heartbeat,
initialDelayMs, HEARTBEAT_DELAY_MS, TimeUnit.MILLISECONDS);
}
/**
* The heartbeat runnable.
*/
class NodeHeartbeat implements Runnable {
@Override
public void run() {
rescheduleNextHeartbeat(HEARTBEAT_DELAY_MS);
try {
AgentStatusResponse agentStatus = null;
try {
agentStatus = client.status();
} catch (ConnectException e) {
log.error("{}: failed to get agent status: ConnectException {}", node.name(), e.getMessage());
return;
} catch (Exception e) {
log.error("{}: failed to get agent status", node.name(), e);
// TODO: eventually think about putting tasks into a bad state as a result of
// agents going down?
return;
}
if (log.isTraceEnabled()) {
log.trace("{}: got heartbeat status {}", node.name(), agentStatus);
}
handleMissingWorkers(agentStatus);
handlePresentWorkers(agentStatus);
} catch (Throwable e) {
log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e);
}
}
/**
* Identify workers which we think should be running but do not appear in the agent's response.
* We need to send startWorker requests for those
*/
private void handleMissingWorkers(AgentStatusResponse agentStatus) {
for (Map.Entry<Long, ManagedWorker> entry : workers.entrySet()) {
Long workerId = entry.getKey();
if (!agentStatus.workers().containsKey(workerId)) {
ManagedWorker worker = entry.getValue();
if (worker.shouldRun) {
worker.tryCreate();
}
}
}
}
private void handlePresentWorkers(AgentStatusResponse agentStatus) {
for (Map.Entry<Long, WorkerState> entry : agentStatus.workers().entrySet()) {
long workerId = entry.getKey();
WorkerState state = entry.getValue();
ManagedWorker worker = workers.get(workerId);
if (worker == null) {
// Identify tasks which are running, but which we don't know about.
// Add these to the NodeManager as tasks that should not be running.
log.warn("{}: scheduling unknown worker with ID {} for stopping.", node.name(), workerId);
workers.put(workerId, new ManagedWorker(workerId, state.taskId(),
state.spec(), false, state));
} else {
// Handle workers which need to be stopped.
if (state instanceof WorkerStarting || state instanceof WorkerRunning) {
if (!worker.shouldRun) {
worker.tryStop();
}
}
// Notify the TaskManager if the worker state has changed.
if (worker.state.equals(state)) {
log.debug("{}: worker state is still {}", node.name(), worker.state);
} else {
log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state);
if (state instanceof WorkerDone || state instanceof WorkerStopping)
worker.shouldRun = false;
worker.state = state;
taskManager.updateWorkerState(node.name(), worker.workerId, state);
}
}
}
}
}
/**
* Create a new worker.
*
* @param workerId The new worker id.
* @param taskId The new task id.
* @param spec The task specification to use with the new worker.
*/
public void createWorker(long workerId, String taskId, TaskSpec spec) {
executor.submit(new CreateWorker(workerId, taskId, spec));
}
/**
* Starts a worker.
*/
class CreateWorker implements Callable<Void> {
private final long workerId;
private final String taskId;
private final TaskSpec spec;
CreateWorker(long workerId, String taskId, TaskSpec spec) {
this.workerId = workerId;
this.taskId = taskId;
this.spec = spec;
}
@Override
public Void call() throws Exception {
ManagedWorker worker = workers.get(workerId);
if (worker != null) {
log.error("{}: there is already a worker {} with ID {}.",
node.name(), worker, workerId);
return null;
}
worker = new ManagedWorker(workerId, taskId, spec, true, new WorkerReceiving(taskId, spec));
log.info("{}: scheduling worker {} to start.", node.name(), worker);
workers.put(workerId, worker);
rescheduleNextHeartbeat(0);
return null;
}
}
/**
* Stop a worker.
*
* @param workerId The id of the worker to stop.
*/
public void stopWorker(long workerId) {
executor.submit(new StopWorker(workerId));
}
/**
* Stops a worker.
*/
class StopWorker implements Callable<Void> {
private final long workerId;
StopWorker(long workerId) {
this.workerId = workerId;
}
@Override
public Void call() throws Exception {
ManagedWorker worker = workers.get(workerId);
if (worker == null) {
log.error("{}: unable to locate worker to stop with ID {}.", node.name(), workerId);
return null;
}
if (!worker.shouldRun) {
log.error("{}: Worker {} is already scheduled to stop.",
node.name(), worker);
return null;
}
log.info("{}: scheduling worker {} to stop.", node.name(), worker);
worker.shouldRun = false;
rescheduleNextHeartbeat(0);
return null;
}
}
/**
* Destroy a worker.
*
* @param workerId The id of the worker to destroy.
*/
public void destroyWorker(long workerId) {
executor.submit(new DestroyWorker(workerId));
}
/**
* Destroys a worker.
*/
class DestroyWorker implements Callable<Void> {
private final long workerId;
DestroyWorker(long workerId) {
this.workerId = workerId;
}
@Override
public Void call() throws Exception {
ManagedWorker worker = workers.remove(workerId);
if (worker == null) {
log.error("{}: unable to locate worker to destroy with ID {}.", node.name(), workerId);
return null;
}
rescheduleNextHeartbeat(0);
return null;
}
}
public void beginShutdown(boolean stopNode) {
executor.shutdownNow();
if (stopNode) {
try {
client.invokeShutdown();
} catch (Exception e) {
log.error("{}: Failed to send shutdown request", node.name(), e);
}
}
}
public void waitForShutdown() throws InterruptedException {
executor.awaitTermination(1, TimeUnit.DAYS);
}
};