blob: 93e19d20958531a229eb9e41f29e344d26ca621d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.collections.map.MultiValueMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.util.Pair;
import org.apache.hama.ipc.GroomProtocol;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
public class TaskDelegator implements GroomStatusListener {
public static final Log LOG = LogFactory.getLog(MesosScheduler.class);
private Set<TaskInProgress> recoveryTasks;
/**
* Map to hold assignments from groomServerNames to TasksInProgress
*/
private MultiValueMap assignments = new MultiValueMap();
private MultiValueMap jobAssignments = new MultiValueMap();
private AtomicReference<GroomServerManager> groomServerManager;
/**
* Map from Pair of groomServerName and port number to the latest
* GroomServerStatus
*/
private Map<Pair<String, Integer>, GroomServerStatus> groomServers = new HashMap<Pair<String, Integer>, GroomServerStatus>();
private Map<Pair<String, Integer>, Protos.TaskID> groomTaskIDs = new HashMap<Pair<String, Integer>, Protos.TaskID>();
private SchedulerDriver driver;
public TaskDelegator(AtomicReference<GroomServerManager> groomServerManager,
SchedulerDriver driver, Set<TaskInProgress> recoveryTasks) {
this.groomServerManager = groomServerManager;
groomServerManager.get().addJobInProgressListener(
new TaskDelegatorJobListener());
this.driver = driver;
this.recoveryTasks = recoveryTasks;
}
@Override
public void groomServerRegistered(GroomServerStatus status) {
Pair<String, Integer> key = new Pair<String, Integer>(
status.getGroomHostName(), BSPMaster.resolveWorkerAddress(
status.rpcServer).getPort());
LOG.debug("Received Groom From: " + key.getKey() + ":" + key.getValue());
if (assignments.containsKey(key)) {
for (Object tip : assignments.getCollection(key)) {
execute((TaskInProgress) tip, status);
}
} else {
LOG.error("Unexpected host found: " + key.getKey() + ":" + key.getValue());
}
groomServers.put(key, status);
}
/**
* Add a task for execution when the groom server becomes available
*
* @param tip The TaskInProgress to execute
* @param hostName The hostname where the resource reservation was made
*/
public void addTask(TaskInProgress tip, Protos.TaskID taskId,
String hostName, Integer port) {
LOG.trace("Adding Host: " + hostName + ":" + port + " for Task:"
+ tip.getTaskId());
Pair<String, Integer> key = new Pair<String, Integer>(hostName, port);
groomTaskIDs.put(key, taskId);
if (groomServers.containsKey(key)) {
execute(tip, groomServers.get(key));
} else {
assignments.put(key, tip);
jobAssignments.put(tip.getJob(), new Pair<Object, Object>(key, tip));
}
}
private void execute(TaskInProgress tip, GroomServerStatus status) {
Task task = tip.constructTask(status);
GroomServerAction[] actions;
GroomProtocol worker = groomServerManager.get().findGroomServer(status);
if (!recoveryTasks.contains(tip)) {
actions = new GroomServerAction[1];
actions[0] = new LaunchTaskAction(task);
} else {
LOG.trace("Executing a recovery task");
recoveryTasks.remove(tip);
HashMap<String, GroomServerStatus> groomStatuses = new HashMap<String, GroomServerStatus>(
1);
groomStatuses.put(status.hostName, status);
Map<GroomServerStatus, List<GroomServerAction>> actionMap = new HashMap<GroomServerStatus, List<GroomServerAction>>(
2 * groomStatuses.size());
try {
tip.getJob().recoverTasks(groomStatuses, actionMap);
} catch (IOException e) {
LOG.warn("Task recovery failed", e);
}
List<GroomServerAction> actionList = actionMap.get(status);
actions = new GroomServerAction[actionList.size()];
actionList.toArray(actions);
}
Directive d1 = new DispatchTasksDirective(actions);
try {
worker.dispatch(d1);
} catch (IOException ioe) {
LOG.error(
"Fail to dispatch tasks to GroomServer " + status.getGroomName(), ioe);
}
}
@Override
public void taskComplete(GroomServerStatus status, TaskInProgress task) {
Pair<String, Integer> key = new Pair<String, Integer>(
status.getGroomHostName(), BSPMaster.resolveWorkerAddress(
status.rpcServer).getPort());
groomServers.put(key, status);
assignments.remove(key, task);
jobAssignments.remove(task.getJob(), new Pair<Object, Object>(key, task));
if (assignments.getCollection(key) == null) {
groomServers.remove(key);
driver.killTask(groomTaskIDs.get(key));
}
}
private class TaskDelegatorJobListener extends JobInProgressListener {
@Override
public void jobAdded(JobInProgress job) throws IOException {
}
@Override
public void jobRemoved(JobInProgress job) throws IOException {
@SuppressWarnings("unchecked")
Collection<Pair<Object, Object>> remainingTasks = jobAssignments
.getCollection(job);
if (remainingTasks != null) {
for (Pair<Object, Object> taskToRemove : remainingTasks) {
assignments.remove(taskToRemove.getKey(), taskToRemove.getValue());
if (assignments.getCollection(taskToRemove.getKey()) == null) {
groomServers.remove(taskToRemove.getKey());
driver.killTask(groomTaskIDs.get(taskToRemove.getKey()));
}
}
jobAssignments.remove(job);
}
}
@Override
public void recoverTaskInJob(JobInProgress job) throws IOException {
}
}
}