blob: a95f64cb623eb895967695ca361744371c5c851f [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.PeriodGranularity;
import com.metamx.druid.merger.common.TaskHolder;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.emitter.EmittingLogger;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
import com.netflix.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* The RemoteTaskRunner encapsulates all interactions with Zookeeper and keeps track of which workers
* are running which tasks. The RemoteTaskRunner is event driven and updates state according to ephemeral node
* changes in ZK.
* <p/>
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. RemoteTaskRunners have scaling
* strategies to help them decide when to create or delete new resources. When tasks are assigned to the remote
* task runner and no workers have capacity to handle the task, provisioning will be done according to the strategy.
* The remote task runner periodically runs a check to see if any worker nodes have not had any work for a
* specified period of time. If so, the worker node will be terminated.
* <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will automatically retry any tasks
* that were associated with the node.
*/
public class RemoteTaskRunner implements TaskRunner
{
private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class);
private static final Joiner JOINER = Joiner.on("/");
private final ObjectMapper jsonMapper;
private final RemoteTaskRunnerConfig config;
private final CuratorFramework cf;
private final PathChildrenCache workerPathCache;
private final ScheduledExecutorService scheduledExec;
private final RetryPolicyFactory retryPolicyFactory;
private final ScalingStrategy strategy;
private final WorkerSetupManager workerSetupManager;
// all workers that exist in ZK
private final Map<String, WorkerWrapper> zkWorkers = new ConcurrentHashMap<String, WorkerWrapper>();
// all tasks that are assigned or need to be assigned
private final Map<String, TaskWrapper> tasks = new ConcurrentHashMap<String, TaskWrapper>();
private final ConcurrentSkipListSet<String> currentlyProvisioning = new ConcurrentSkipListSet<String>();
private final ConcurrentSkipListSet<String> currentlyTerminating = new ConcurrentSkipListSet<String>();
private final Object statusLock = new Object();
private volatile DateTime lastProvisionTime = new DateTime();
private volatile DateTime lastTerminateTime = new DateTime();
private volatile boolean started = false;
public RemoteTaskRunner(
ObjectMapper jsonMapper,
RemoteTaskRunnerConfig config,
CuratorFramework cf,
PathChildrenCache workerPathCache,
ScheduledExecutorService scheduledExec,
RetryPolicyFactory retryPolicyFactory,
ScalingStrategy strategy,
WorkerSetupManager workerSetupManager
)
{
this.jsonMapper = jsonMapper;
this.config = config;
this.cf = cf;
this.workerPathCache = workerPathCache;
this.scheduledExec = scheduledExec;
this.retryPolicyFactory = retryPolicyFactory;
this.strategy = strategy;
this.workerSetupManager = workerSetupManager;
}
@LifecycleStart
public void start()
{
try {
workerPathCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
{
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
final Worker worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
log.info("New worker[%s] found!", worker.getHost());
addWorker(worker);
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
final Worker worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
log.info("Worker[%s] removed!", worker.getHost());
removeWorker(worker);
}
}
}
);
workerPathCache.start();
// Schedule termination of worker nodes periodically
Period period = new Period(config.getTerminateResourcesDuration());
PeriodGranularity granularity = new PeriodGranularity(period, config.getTerminateResourcesOriginDateTime(), null);
final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis()));
ScheduledExecutors.scheduleAtFixedRate(
scheduledExec,
new Duration(
System.currentTimeMillis(),
startTime
),
config.getTerminateResourcesDuration(),
new Runnable()
{
@Override
public void run()
{
if (currentlyTerminating.isEmpty()) {
if (zkWorkers.size() <= workerSetupManager.getWorkerSetupData().getMinNumWorkers()) {
return;
}
int workerCount = 0;
List<WorkerWrapper> thoseLazyWorkers = Lists.newArrayList();
for (WorkerWrapper workerWrapper : zkWorkers.values()) {
workerCount++;
if (workerCount > workerSetupManager.getWorkerSetupData().getMinNumWorkers() &&
workerWrapper.getRunningTasks().isEmpty() &&
System.currentTimeMillis() - workerWrapper.getLastCompletedTaskTime().getMillis()
> config.getMaxWorkerIdleTimeMillisBeforeDeletion()
) {
thoseLazyWorkers.add(workerWrapper);
}
}
AutoScalingData terminated = strategy.terminate(
Lists.transform(
thoseLazyWorkers,
new Function<WorkerWrapper, String>()
{
@Override
public String apply(WorkerWrapper input)
{
return input.getWorker().getIp();
}
}
)
);
if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = new DateTime();
}
} else {
Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
log.makeAlert("Worker node termination taking too long")
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
.addData("terminatingCount", currentlyTerminating.size())
.emit();
}
log.info(
"%s still terminating. Wait for all nodes to terminate before trying again.",
currentlyTerminating
);
}
}
}
);
started = true;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@LifecycleStop
public void stop()
{
try {
for (WorkerWrapper workerWrapper : zkWorkers.values()) {
workerWrapper.close();
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
started = false;
}
}
public boolean hasStarted()
{
return started;
}
public int getNumWorkers()
{
return zkWorkers.size();
}
@Override
public void run(Task task, TaskContext context, TaskCallback callback)
{
if (tasks.containsKey(task.getId())) {
throw new ISE("Assigned a task[%s] that already exists, WTF is happening?!", task.getId());
}
TaskWrapper taskWrapper = new TaskWrapper(
task, context, callback, retryPolicyFactory.makeRetryPolicy()
);
tasks.put(taskWrapper.getTask().getId(), taskWrapper);
assignTask(taskWrapper);
}
private void assignTask(TaskWrapper taskWrapper)
{
WorkerWrapper workerWrapper = findWorkerRunningTask(taskWrapper);
// If the task already exists, we don't need to announce it
if (workerWrapper != null) {
final Worker worker = workerWrapper.getWorker();
try {
log.info("Worker[%s] is already running task[%s].", worker.getHost(), taskWrapper.getTask().getId());
TaskStatus taskStatus = jsonMapper.readValue(
workerWrapper.getStatusCache()
.getCurrentData(
JOINER.join(config.getStatusPath(), worker.getHost(), taskWrapper.getTask().getId())
)
.getData(),
TaskStatus.class
);
if (taskStatus.isComplete()) {
TaskCallback callback = taskWrapper.getCallback();
if (callback != null) {
callback.notify(taskStatus);
}
new CleanupPaths(worker.getHost(), taskWrapper.getTask().getId()).run();
}
}
catch (Exception e) {
log.error(e, "Task exists, but hit exception!");
retryTask(new CleanupPaths(worker.getHost(), taskWrapper.getTask().getId()), taskWrapper);
}
} else {
// Announce the task or retry if there is not enough capacity
workerWrapper = findWorkerForTask();
if (workerWrapper != null) {
announceTask(workerWrapper.getWorker(), taskWrapper);
} else {
retryTask(null, taskWrapper);
}
}
}
/**
* Retries a task that has failed.
*
* @param pre - A runnable that is executed before the retry occurs
* @param taskWrapper - a container for task properties
*/
private void retryTask(
final Runnable pre,
final TaskWrapper taskWrapper
)
{
final Task task = taskWrapper.getTask();
final RetryPolicy retryPolicy = taskWrapper.getRetryPolicy();
log.info("Registering retry for failed task[%s]", task.getId());
if (retryPolicy.hasExceededRetryThreshold()) {
log.makeAlert("Task exceeded maximum retry count")
.addData("task", task.getId())
.addData("retryCount", retryPolicy.getNumRetries())
.emit();
return;
}
scheduledExec.schedule(
new Runnable()
{
@Override
public void run()
{
if (pre != null) {
pre.run();
}
if (tasks.containsKey(task.getId())) {
log.info("Retry[%d] for task[%s]", retryPolicy.getNumRetries(), task.getId());
assignTask(taskWrapper);
}
}
},
retryPolicy.getAndIncrementRetryDelay().getMillis(),
TimeUnit.MILLISECONDS
);
}
/**
* When a new worker appears, listeners are registered for status changes.
* Status changes indicate the creation or completion of task.
* The RemoteTaskRunner updates state according to these changes.
*
* @param worker - contains metadata for a worker that has appeared in ZK
*/
private void addWorker(final Worker worker)
{
try {
currentlyProvisioning.removeAll(strategy.ipLookup(Arrays.<String>asList(worker.getIp())));
final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost());
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
final WorkerWrapper workerWrapper = new WorkerWrapper(
worker,
statusCache,
jsonMapper
);
// Add status listener to the watcher for status changes
statusCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
synchronized (statusLock) {
try {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) ||
event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
final TaskStatus taskStatus;
// This can fail if a worker writes a bogus status. Retry if so.
try {
taskStatus = jsonMapper.readValue(
event.getData().getData(), TaskStatus.class
);
if (!taskStatus.getId().equals(taskId)) {
// Sanity check
throw new ISE(
"Worker[%s] status id does not match payload id: %s != %s",
worker.getHost(),
taskId,
taskStatus.getId()
);
}
}
catch (Exception e) {
log.warn(e, "Worker[%s] wrote bogus status for task: %s", worker.getHost(), taskId);
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
throw Throwables.propagate(e);
}
log.info(
"Worker[%s] wrote %s status for task: %s",
worker.getHost(),
taskStatus.getStatusCode(),
taskId
);
statusLock.notify();
if (taskStatus.isComplete()) {
// Worker is done with this task
workerWrapper.setLastCompletedTaskTime(new DateTime());
final TaskWrapper taskWrapper = tasks.get(taskId);
if (taskWrapper == null) {
log.warn(
"WTF?! Worker[%s] completed a task I didn't know about: %s",
worker.getHost(),
taskId
);
} else {
final TaskCallback callback = taskWrapper.getCallback();
// Cleanup
if (callback != null) {
callback.notify(taskStatus);
}
tasks.remove(taskId);
cf.delete().guaranteed().inBackground().forPath(event.getData().getPath());
}
}
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to handle new worker status")
.addData("worker", worker.getHost())
.addData("znode", event.getData().getPath())
.emit();
}
}
}
}
);
zkWorkers.put(worker.getHost(), workerWrapper);
statusCache.start();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
private WorkerWrapper findWorkerRunningTask(TaskWrapper taskWrapper)
{
for (WorkerWrapper workerWrapper : zkWorkers.values()) {
if (workerWrapper.getRunningTasks().contains(taskWrapper.getTask().getId())) {
return workerWrapper;
}
}
return null;
}
/**
* When a ephemeral worker node disappears from ZK, we have to make sure there are no tasks still assigned
* to the worker. If tasks remain, they are retried.
*
* @param worker - the removed worker
*/
private void removeWorker(final Worker worker)
{
currentlyTerminating.remove(worker.getHost());
WorkerWrapper workerWrapper = zkWorkers.get(worker.getHost());
if (workerWrapper != null) {
try {
Set<String> tasksToRetry = Sets.newHashSet(workerWrapper.getRunningTasks());
tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), worker.getHost())));
for (String taskId : tasksToRetry) {
TaskWrapper taskWrapper = tasks.get(taskId);
if (taskWrapper != null) {
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
}
}
workerWrapper.getStatusCache().close();
}
catch (Exception e) {
log.error(e, "Failed to cleanly remove worker[%s]");
}
}
zkWorkers.remove(worker.getHost());
}
private WorkerWrapper findWorkerForTask()
{
try {
final MinMaxPriorityQueue<WorkerWrapper> workerQueue = MinMaxPriorityQueue.<WorkerWrapper>orderedBy(
new Comparator<WorkerWrapper>()
{
@Override
public int compare(WorkerWrapper w1, WorkerWrapper w2)
{
return -Ints.compare(w1.getRunningTasks().size(), w2.getRunningTasks().size());
}
}
).create(
FunctionalIterable.create(zkWorkers.values()).filter(
new Predicate<WorkerWrapper>()
{
@Override
public boolean apply(WorkerWrapper input)
{
return (!input.isAtCapacity() &&
input.getWorker()
.getVersion()
.compareTo(workerSetupManager.getWorkerSetupData().getMinVersion()) >= 0);
}
}
)
);
if (workerQueue.isEmpty()) {
log.info("Worker nodes do not have capacity to run any more tasks!");
if (currentlyProvisioning.isEmpty()) {
AutoScalingData provisioned = strategy.provision();
if (provisioned != null) {
currentlyProvisioning.addAll(provisioned.getNodeIds());
lastProvisionTime = new DateTime();
}
} else {
Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime);
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
log.makeAlert("Worker node provisioning taking too long")
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
.addData("provisioningCount", currentlyProvisioning.size())
.emit();
}
log.info(
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
currentlyProvisioning
);
}
return null;
}
return workerQueue.peek();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
/**
* Creates a ZK entry under a specific path associated with a worker. The worker is responsible for
* removing the task ZK entry and creating a task status ZK entry.
*
* @param theWorker The worker the task is assigned to
* @param taskWrapper The task to be assigned
*/
private void announceTask(Worker theWorker, TaskWrapper taskWrapper)
{
synchronized (statusLock) {
final Task task = taskWrapper.getTask();
final TaskContext taskContext = taskWrapper.getTaskContext();
try {
log.info("Coordinator asking Worker[%s] to add task[%s]", theWorker.getHost(), task.getId());
tasks.put(task.getId(), taskWrapper);
byte[] rawBytes = jsonMapper.writeValueAsBytes(new TaskHolder(task, taskContext));
if (rawBytes.length > config.getMaxNumBytes()) {
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
}
cf.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(
JOINER.join(
config.getTaskPath(),
theWorker.getHost(),
task.getId()
),
jsonMapper.writeValueAsBytes(new TaskHolder(task, taskContext))
);
// Syncing state with Zookeeper
while (findWorkerRunningTask(taskWrapper) == null) {
statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis());
}
}
catch (Exception e) {
log.error(e, "Exception creating task[%s] for worker node[%s]", task.getId(), theWorker.getHost());
throw Throwables.propagate(e);
}
}
}
private class CleanupPaths implements Runnable
{
private final String workerId;
private final String taskId;
private CleanupPaths(String workerId, String taskId)
{
this.workerId = workerId;
this.taskId = taskId;
}
@Override
public void run()
{
try {
final String statusPath = JOINER.join(config.getStatusPath(), workerId, taskId);
cf.delete().guaranteed().forPath(statusPath);
}
catch (Exception e) {
log.warn("Tried to delete a status path that didn't exist! Must've gone away already?");
}
try {
final String taskPath = JOINER.join(config.getTaskPath(), workerId, taskId);
cf.delete().guaranteed().forPath(taskPath);
}
catch (Exception e) {
log.warn("Tried to delete a task path that didn't exist! Must've gone away already?");
}
}
}
}