blob: 03be1a4a7500dde62cd5b5eff3f7bf788abc1f5a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hama.ipc.RPC;
import org.apache.hama.ipc.RemoteException;
import org.apache.hama.ipc.Server;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.http.HttpServer;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.GroomProtocol;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.ipc.MasterProtocol;
import org.apache.hama.monitor.Monitor;
import org.apache.hama.monitor.fd.FDProvider;
import org.apache.hama.monitor.fd.Sensor;
import org.apache.hama.monitor.fd.UDPSensor;
import org.apache.hama.util.BSPNetUtils;
import org.apache.hama.zookeeper.QuorumPeer;
import org.apache.log4j.LogManager;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
/**
* A Groom Server (shortly referred to as groom) is a process that performs bsp
* tasks assigned by BSPMaster. Each groom contacts the BSPMaster, and it takes
* assigned tasks and reports its status by means of periodical piggybacks with
* BSPMaster. Each groom is designed to run with HDFS or other distributed
* storages. Basically, a groom server and a data node should be run on one
* physical node.
*/
public class GroomServer implements Runnable, GroomProtocol, BSPPeerProtocol,
Watcher {
public static final Log LOG = LogFactory.getLog(GroomServer.class);
static final String SUBDIR = "groomServer";
private volatile static int REPORT_INTERVAL = 1 * 1000;
final Configuration conf;
// Constants
static enum State {
NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
}
private HttpServer server;
private ZooKeeper zk = null;
// Running States and its related things
volatile boolean initialized = false;
volatile boolean running = true;
volatile boolean shuttingDown = false;
boolean justInited = true;
GroomServerStatus status = null;
// Attributes
String groomServerName;
String localHostname;
String groomHostName;
InetSocketAddress bspMasterAddr;
private Instructor instructor;
// Filesystem
// private LocalDirAllocator localDirAllocator;
Path systemDirectory = null;
FileSystem systemFS = null;
// Job
private int failures;
private int maxCurrentTasks;
Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
/** Map from taskId -> TaskInProgress. */
Map<TaskAttemptID, TaskInProgress> runningTasks = null;
Map<TaskAttemptID, TaskInProgress> finishedTasks = null;
Map<TaskAttemptID, Integer> assignedPeerNames = null;
Map<BSPJobID, RunningJob> runningJobs = null;
// new nexus between GroomServer and BSPMaster
// holds/ manage all tasks
// List<TaskInProgress> tasksList = new
// CopyOnWriteArrayList<TaskInProgress>();
private String rpcServer;
private Server workerServer;
MasterProtocol masterClient;
InetSocketAddress taskReportAddress;
Server taskReportServer = null;
// private BlockingQueue<GroomServerAction> tasksToCleanup = new
// LinkedBlockingQueue<GroomServerAction>();
// Schedule Heartbeats to GroomServer
private ScheduledExecutorService taskMonitorService;
private final AtomicReference<Sensor> sensor = new AtomicReference<Sensor>();
private class DispatchTasksHandler implements DirectiveHandler {
@Override
public void handle(Directive directive) throws DirectiveException {
GroomServerAction[] actions = ((DispatchTasksDirective) directive)
.getActions();
if (LOG.isDebugEnabled()) {
LOG.debug("Got Response from BSPMaster with "
+ ((actions != null) ? actions.length : 0) + " actions");
}
if (actions != null) {
// assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
int prevPort = Constants.DEFAULT_PEER_PORT;
for (GroomServerAction action : actions) {
if (action instanceof LaunchTaskAction) {
Task t = ((LaunchTaskAction) action).getTask();
synchronized (assignedPeerNames) {
prevPort = BSPNetUtils.getNextAvailable(prevPort);
assignedPeerNames.put(t.getTaskID(), prevPort);
}
LOG.info("Launch " + actions.length + " tasks.");
startNewTask((LaunchTaskAction) action);
} else if (action instanceof KillTaskAction) {
// TODO Use the cleanup thread
// tasksToCleanup.put(action);
LOG.info("Kill " + actions.length + " tasks.");
KillTaskAction killAction = (KillTaskAction) action;
if (tasks.containsKey(killAction.getTaskID())) {
TaskInProgress tip = tasks.get(killAction.getTaskID());
tip.taskStatus.setRunState(TaskStatus.State.FAILED);
try {
tip.killAndCleanup(false);
tasks.remove(killAction.getTaskID());
runningTasks.remove(killAction.getTaskID());
} catch (IOException ioe) {
throw new DirectiveException("Error when killing a "
+ "TaskInProgress.", ioe);
}
}
} else if (action instanceof RecoverTaskAction) {
LOG.info("Recovery action task.");
RecoverTaskAction recoverAction = (RecoverTaskAction) action;
Task t = recoverAction.getTask();
LOG.info("Recovery action task." + t.getTaskID());
synchronized (assignedPeerNames) {
prevPort = BSPNetUtils.getNextAvailable(prevPort);
assignedPeerNames.put(t.getTaskID(), prevPort);
}
try {
startRecoveryTask(recoverAction);
} catch (IOException e) {
throw new DirectiveException(new StringBuffer()
.append("Error starting the recovery task")
.append(t.getTaskID()).toString(), e);
}
}
}
}
}
}
private class Instructor extends Thread {
final BlockingQueue<Directive> buffer = new LinkedBlockingQueue<Directive>();
final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers = new ConcurrentHashMap<Class<? extends Directive>, DirectiveHandler>();
public void bind(Class<? extends Directive> instruction,
DirectiveHandler handler) {
handlers.putIfAbsent(instruction, handler);
}
public void put(Directive directive) {
try {
buffer.put(directive);
} catch (InterruptedException ie) {
LOG.error("Unable to put directive into queue.", ie);
Thread.currentThread().interrupt();
}
}
@Override
public void run() {
while (true) {
try {
Directive directive = buffer.take();
handlers.get(directive.getClass()).handle(directive);
} catch (InterruptedException ie) {
LOG.error("Unable to retrieve directive from the queue.", ie);
Thread.currentThread().interrupt();
} catch (Exception e) {
LOG.error("Fail to execute directive.", e);
}
}
}
}
/*
* This thread is responsible for monitoring the pings from peers. If any peer
* fails to ping this groom for a pre-defined period, the task is purged from
* the records.
*/
private class BSPTasksMonitor extends Thread {
private List<TaskInProgress> outOfContactTasks = new ArrayList<GroomServer.TaskInProgress>(
conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3));
private BSPTasksMonitor() {
outOfContactTasks = new ArrayList<GroomServer.TaskInProgress>(
conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3));
}
@Override
public void run() {
getObliviousTasks(outOfContactTasks);
if (outOfContactTasks.size() > 0) {
LOG.debug("Got " + outOfContactTasks.size() + " oblivious tasks");
}
for (TaskInProgress tip : outOfContactTasks) {
try {
LOG.debug("Purging task " + tip);
purgeTask(tip, true);
} catch (Exception e) {
LOG.error(
new StringBuilder("Error while removing a timed-out task - ")
.append(tip.toString()), e);
}
}
outOfContactTasks.clear();
}
}
public GroomServer(Configuration conf) throws IOException {
LOG.info("groom start");
this.conf = conf;
bspMasterAddr = BSPMaster.getAddress(conf);
if (bspMasterAddr == null) {
System.out.println(BSPMaster.localModeMessage);
LOG.info(BSPMaster.localModeMessage);
System.exit(0);
}
// FileSystem local = FileSystem.getLocal(conf);
// this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
try {
zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
} catch (IOException e) {
LOG.error("Exception during reinitialization!", e);
}
}
public synchronized void initialize() throws IOException {
if (this.conf.get(Constants.PEER_HOST) != null) {
this.localHostname = conf.get(Constants.PEER_HOST);
}
if (localHostname == null) {
this.localHostname = DNS.getDefaultHost(
conf.get("bsp.dns.interface", "default"),
conf.get("bsp.dns.nameserver", "default"));
}
// check local disk
checkLocalDirs(getLocalDirs());
deleteLocalFiles(SUBDIR);
// Clear out state tables
this.tasks.clear();
this.runningJobs = new TreeMap<BSPJobID, RunningJob>();
this.runningTasks = new ConcurrentHashMap<TaskAttemptID, TaskInProgress>();
this.finishedTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
this.conf.set(Constants.PEER_HOST, localHostname);
this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
this.maxCurrentTasks = conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3);
this.assignedPeerNames = new HashMap<TaskAttemptID, Integer>(
2 * this.maxCurrentTasks);
int rpcPort = -1;
String rpcAddr = null;
if (false == this.initialized) {
rpcAddr = conf.get(Constants.GROOM_RPC_HOST,
Constants.DEFAULT_GROOM_RPC_HOST);
rpcPort = conf.getInt(Constants.GROOM_RPC_PORT,
Constants.DEFAULT_GROOM_RPC_PORT);
if (-1 == rpcPort || null == rpcAddr)
throw new IllegalArgumentException("Error rpc address " + rpcAddr
+ " port" + rpcPort);
this.workerServer = RPC.getServer(this, rpcAddr, rpcPort, conf);
this.workerServer.start();
this.rpcServer = rpcAddr + ":" + rpcPort;
LOG.info("Worker rpc server --> " + rpcServer);
}
server = new HttpServer("groomserver", rpcAddr, conf.getInt(
"bsp.http.groomserver.port", Constants.DEFAULT_GROOM_INFO_SERVER),
true, conf);
FileSystem local = FileSystem.getLocal(conf);
server.setAttribute("groom.server", this);
server.setAttribute("local.file.system", local);
server.setAttribute("conf", conf);
server.setAttribute("log", LOG);
server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
LOG.info("starting webserver: " + rpcAddr);
server.start();
String address = BSPNetUtils.getServerAddress(conf,
"bsp.groom.report.bindAddress", "bsp.groom.report.port",
"bsp.groom.report.address");
InetSocketAddress socAddr = BSPNetUtils.createSocketAddr(address);
String bindAddress = socAddr.getHostName();
int tmpPort = socAddr.getPort();
// RPC initialization
// TODO numHandlers should be a ..
this.taskReportServer = RPC.getServer(this, bindAddress, tmpPort, 10,
false, this.conf);
this.taskReportServer.start();
// get the assigned address
this.taskReportAddress = taskReportServer.getListenerAddress();
this.conf.set("bsp.groom.report.address", taskReportAddress.getHostName()
+ ":" + taskReportAddress.getPort());
LOG.info("GroomServer up at: " + this.taskReportAddress);
this.groomHostName = rpcAddr;
this.groomServerName = "groomd_" + this.rpcServer.replace(':', '_');
LOG.info("Starting groom: " + this.rpcServer);
// establish the communication link to bsp master
this.masterClient = (MasterProtocol) RPC.waitForProxy(MasterProtocol.class,
HamaRPCProtocolVersion.versionID, bspMasterAddr, conf);
// enroll in bsp master
if (-1 == rpcPort || null == rpcAddr)
throw new IllegalArgumentException("Error rpc address " + rpcAddr
+ " port" + rpcPort);
if (!this.masterClient.register(new GroomServerStatus(groomServerName,
cloneAndResetRunningTaskStatuses(), failures, maxCurrentTasks,
this.rpcServer, groomHostName))) {
LOG.error("There is a problem in establishing communication"
+ " link with BSPMaster");
throw new IOException("There is a problem in establishing"
+ " communication link with BSPMaster.");
}
this.instructor = new Instructor();
this.instructor.bind(DispatchTasksDirective.class,
new DispatchTasksHandler());
instructor.start();
if (this.taskMonitorService == null) {
this.taskMonitorService = Executors.newScheduledThreadPool(1);
long monitorPeriod = this.conf.getLong(Constants.GROOM_PING_PERIOD,
Constants.DEFAULT_GROOM_PING_PERIOD);
if (monitorPeriod > 0) {
this.taskMonitorService.scheduleWithFixedDelay(new BSPTasksMonitor(),
1000, monitorPeriod, TimeUnit.MILLISECONDS);
}
}
if (conf.getBoolean("bsp.monitor.enabled", false)) {
new Monitor(conf, zk, this.groomServerName).start();
}
if (conf.getBoolean("bsp.monitor.fd.enabled", false)) {
this.sensor.set(FDProvider.createSensor(conf.getClass(
"bsp.monitor.fd.sensor.class", UDPSensor.class, Sensor.class),
(HamaConfiguration) conf));
this.sensor.get().start();
}
this.running = true;
this.initialized = true;
// FIXME
}
/** Return the port at which the tasktracker bound to */
public synchronized InetSocketAddress getTaskTrackerReportAddress() {
return taskReportAddress;
}
@Override
public void dispatch(Directive directive) throws IOException {
if (!instructor.isAlive())
throw new IOException();
instructor.put(directive);
}
private static void checkLocalDirs(String[] localDirs)
throws DiskErrorException {
boolean writable = false;
LOG.debug(localDirs);
if (localDirs != null) {
for (String localDir : localDirs) {
try {
LOG.info(localDir);
DiskChecker.checkDir(new File(localDir));
writable = true;
} catch (DiskErrorException e) {
LOG.warn("BSP Processor local " + e.getMessage());
}
}
}
if (!writable)
throw new DiskErrorException("all local directories are not writable");
}
public String[] getLocalDirs() {
return conf.getStrings("bsp.local.dir");
}
public void deleteLocalFiles() throws IOException {
String[] localDirs = getLocalDirs();
for (String localDir : localDirs) {
FileSystem.getLocal(this.conf).delete(new Path(localDir), true);
}
}
public void deleteLocalFiles(String subdir) throws IOException {
try {
String[] localDirs = getLocalDirs();
for (String localDir : localDirs) {
FileSystem.getLocal(this.conf).delete(new Path(localDir, subdir), true);
}
} catch (NullPointerException e) {
LOG.info(e);
}
}
public void cleanupStorage() throws IOException {
deleteLocalFiles();
}
private void startCleanupThreads() throws IOException {
}
public State offerService() throws Exception {
while (running && !shuttingDown) {
try {
List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
// Reports to a BSPMaster
for (Map.Entry<TaskAttemptID, TaskInProgress> e : runningTasks
.entrySet()) {
TaskInProgress tip = e.getValue();
TaskStatus taskStatus = tip.getStatus();
taskStatuses.add(taskStatus);
}
doReport(taskStatuses);
Thread.sleep(REPORT_INTERVAL);
} catch (InterruptedException ie) {
}
try {
if (justInited) {
String dir = masterClient.getSystemDir();
if (dir == null) {
LOG.error("Fail to get system directory.");
throw new IOException("Fail to get system directory.");
}
systemDirectory = new Path(dir);
systemFS = systemDirectory.getFileSystem(conf);
}
justInited = false;
} catch (DiskErrorException de) {
String msg = "Exiting groom server for disk error:\n"
+ StringUtils.stringifyException(de);
LOG.error(msg);
return State.STALE;
} catch (RemoteException re) {
return State.DENIED;
} catch (Exception except) {
String msg = "Caught exception: "
+ StringUtils.stringifyException(except);
LOG.error(msg);
}
Thread.sleep(REPORT_INTERVAL);
}
return State.NORMAL;
}
private void startNewTask(LaunchTaskAction action) {
Task t = action.getTask();
BSPJob jobConf = null;
try {
jobConf = new BSPJob(t.getJobID(), t.getJobFile());
} catch (IOException e1) {
LOG.error(e1);
}
TaskInProgress tip = new TaskInProgress(t, jobConf, this.groomServerName);
synchronized (this) {
tasks.put(t.getTaskID(), tip);
runningTasks.put(t.getTaskID(), tip);
}
try {
localizeJob(tip);
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
.stringifyException(e));
LOG.warn(msg);
try {
tip.killAndCleanup(true);
} catch (IOException ie2) {
LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n"
+ StringUtils.stringifyException(ie2));
}
}
}
private void startRecoveryTask(RecoverTaskAction action) throws IOException {
Task t = action.getTask();
BSPJob jobConf = null;
try {
jobConf = new BSPJob(t.getJobID(), t.getJobFile());
} catch (IOException e1) {
LOG.error(e1);
throw e1;
}
TaskInProgress tip = new TaskInProgress(t, jobConf, this.groomServerName);
tip.markAsRecoveryTask(action.getSuperstepCount());
synchronized (this) {
if (tasks.containsKey(t.getTaskID())) {
TaskInProgress oldTip = tasks.get(t.getTaskID());
try {
oldTip.killRunner();
} catch (IOException e) {
LOG.error("Error killing the current process for " + t.getTaskID(), e);
throw e;
}
}
Iterator<TaskAttemptID> taskIterator = tasks.keySet().iterator();
while (taskIterator.hasNext()) {
TaskAttemptID taskAttId = taskIterator.next();
if (taskAttId.getTaskID().equals(t.getTaskID().getTaskID())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing tasks with id = " + t.getTaskID().getTaskID());
}
taskIterator.remove();
runningTasks.remove(taskAttId);
}
}
tasks.put(t.getTaskID(), tip);
runningTasks.put(t.getTaskID(), tip);
}
try {
localizeJob(tip);
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
.stringifyException(e));
LOG.warn(msg);
try {
tip.killAndCleanup(true);
} catch (IOException ie2) {
LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n"
+ StringUtils.stringifyException(ie2));
}
throw new IOException("Errro localizing the job.", e);
}
}
/**
* Update and report refresh status back to BSPMaster.
*/
public void doReport(List<TaskStatus> taskStatuses) {
GroomServerStatus groomStatus = new GroomServerStatus(groomServerName,
updateTaskStatuses(taskStatuses), failures, maxCurrentTasks, rpcServer,
groomHostName);
try {
boolean ret = masterClient.report(new ReportGroomStatusDirective(
groomStatus));
if (!ret) {
LOG.warn("Fail to renew BSPMaster's GroomServerStatus. "
+ " groom name: " + groomStatus.getGroomName() + " rpc server:"
+ rpcServer);
}
} catch (IOException ioe) {
LOG.error("Fail to communicate with BSPMaster for reporting.", ioe);
}
}
public List<TaskStatus> updateTaskStatuses(List<TaskStatus> taskStatuses) {
List<TaskStatus> tlist = new ArrayList<TaskStatus>();
for (TaskStatus taskStatus : taskStatuses) {
if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED
|| taskStatus.getRunState() == TaskStatus.State.FAILED) {
synchronized (finishedTasks) {
TaskInProgress tip = runningTasks.remove(taskStatus.getTaskId());
tlist.add((TaskStatus) taskStatus.clone());
finishedTasks.put(taskStatus.getTaskId(), tip);
}
} else if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
tlist.add((TaskStatus) taskStatus.clone());
}
}
return tlist;
}
private void localizeJob(TaskInProgress tip) throws IOException {
Task task = tip.getTask();
conf.addResource(task.getJobFile());
BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+ task.getTaskID() + "/" + "job.xml");
RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip);
BSPJob jobConf = null;
synchronized (rjob) {
if (!rjob.localized) {
FileSystem dfs = FileSystem.get(conf);
FileSystem localFs = FileSystem.getLocal(conf);
Path jobDir = localJobFile.getParent();
if (localFs.exists(jobDir)) {
localFs.delete(jobDir, true);
boolean b = localFs.mkdirs(jobDir);
if (!b)
throw new IOException("Not able to create job directory "
+ jobDir.toString());
}
Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+ task.getTaskID() + "/" + "job.jar");
dfs.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
HamaConfiguration conf = new HamaConfiguration();
conf.addResource(localJobFile);
jobConf = new BSPJob(conf, task.getJobID().toString());
Path jarFile = null;
if (jobConf.getJar() != null) {
jarFile = new Path(jobConf.getJar());
} else {
LOG.warn("No jar file for job " + task.getJobID()
+ " has been defined!");
}
jobConf.setJar(localJarFile.toString());
if (jarFile != null) {
dfs.copyToLocalFile(jarFile, localJarFile);
// also unjar the job.jar files in workdir
File workDir = new File(
new File(localJobFile.toString()).getParent(), "work");
if (!workDir.mkdirs()) {
if (!workDir.isDirectory()) {
throw new IOException("Mkdirs failed to create "
+ workDir.toString());
}
}
RunJar.unJar(new File(localJarFile.toString()), workDir);
}
rjob.localized = true;
} else {
HamaConfiguration conf = new HamaConfiguration();
conf.addResource(rjob.getJobFile());
jobConf = new BSPJob(conf, rjob.getJobId().toString());
}
}
launchTaskForJob(tip, jobConf);
}
private static void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
try {
tip.setJobConf(jobConf);
tip.launchTask();
} catch (Throwable ie) {
tip.taskStatus.setRunState(TaskStatus.State.FAILED);
String error = StringUtils.stringifyException(ie);
LOG.info(error);
}
}
private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile,
TaskInProgress tip) {
synchronized (runningJobs) {
RunningJob rJob = null;
if (!runningJobs.containsKey(jobId)) {
rJob = new RunningJob(jobId, localJobFile);
rJob.localized = false;
rJob.tasks = new HashSet<TaskInProgress>();
rJob.jobFile = localJobFile;
runningJobs.put(jobId, rJob);
} else {
rJob = runningJobs.get(jobId);
}
rJob.tasks.add(tip);
return rJob;
}
}
private synchronized void getObliviousTasks(
List<TaskInProgress> outOfContactTasks) {
if (runningTasks == null) {
LOG.debug("returning null");
return;
}
long currentTime = Calendar.getInstance().getTimeInMillis();
long monitorPeriod = conf.getLong(Constants.GROOM_PING_PERIOD,
Constants.DEFAULT_GROOM_PING_PERIOD);
for (Map.Entry<TaskAttemptID, TaskInProgress> entry : runningTasks
.entrySet()) {
TaskInProgress tip = entry.getValue();
if (LOG.isDebugEnabled())
LOG.debug("checking task: "
+ tip.getTask().getTaskID()
+ " starttime ="
+ tip.startTime
+ " lastping = "
+ tip.lastPingedTimestamp
+ " run state = "
+ tip.taskStatus.getRunState().toString()
+ " monitorPeriod = "
+ monitorPeriod
+ " check = "
+ (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod)))));
// Task is out of contact if it has not pinged since more than
// monitorPeriod. A task is given a leeway of 10 times monitorPeriod
// to get started.
// TODO Please refactor this conditions
// NOTE: (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod
if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
&& (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod)))) {
LOG.info("adding purge task: " + tip.getTask().getTaskID());
outOfContactTasks.add(tip);
}
}
}
/**
* The datastructure for initializing a job
*/
static class RunningJob {
private BSPJobID jobid;
private Path jobFile;
// keep this for later use
Set<TaskInProgress> tasks;
boolean localized;
boolean keepJobFiles;
RunningJob(BSPJobID jobid, Path jobFile) {
this.jobid = jobid;
localized = false;
tasks = new HashSet<TaskInProgress>();
this.jobFile = jobFile;
keepJobFiles = false;
}
Path getJobFile() {
return jobFile;
}
BSPJobID getJobId() {
return jobid;
}
}
private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses() {
List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
for (TaskInProgress tip : runningTasks.values()) {
TaskStatus status = tip.getStatus();
result.add((TaskStatus) status.clone());
}
return result;
}
@Override
public void run() {
try {
initialize();
startCleanupThreads();
boolean denied = false;
while (running && !shuttingDown && !denied) {
boolean staleState = false;
try {
while (running && !staleState && !shuttingDown && !denied) {
try {
State osState = offerService();
if (osState == State.STALE) {
staleState = true;
} else if (osState == State.DENIED) {
denied = true;
}
} catch (Exception e) {
if (!shuttingDown) {
LOG.info("Lost connection to BSP Master [" + bspMasterAddr
+ "]. Retrying...", e);
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
}
}
}
}
} finally {
// close();
}
if (shuttingDown) {
return;
}
LOG.warn("Reinitializing local state");
initialize();
}
} catch (IOException ioe) {
LOG.error("Got fatal exception while reinitializing GroomServer: "
+ StringUtils.stringifyException(ioe));
return;
}
}
public synchronized void shutdown() throws IOException {
shuttingDown = true;
close();
}
@Override
public synchronized void close() throws IOException {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (null != this.sensor.get()) {
this.sensor.get().stop();
}
if (taskMonitorService != null) {
taskMonitorService.shutdownNow();
taskMonitorService = null;
}
this.running = false;
this.initialized = false;
cleanupStorage();
this.workerServer.stop();
RPC.stopProxy(masterClient);
if (taskReportServer != null) {
taskReportServer.stop();
taskReportServer = null;
}
}
public static Thread startGroomServer(final GroomServer hrs) {
return startGroomServer(hrs, "regionserver" + hrs.groomServerName);
}
public static Thread startGroomServer(final GroomServer hrs, final String name) {
Thread t = new Thread(hrs);
t.setName(name);
t.start();
return t;
}
// /////////////////////////////////////////////////////
// TaskInProgress maintains all the info for a Task that
// lives at this GroomServer. It maintains the Task object,
// its TaskStatus, and the BSPTaskRunner.
// /////////////////////////////////////////////////////
class TaskInProgress {
Task task;
BSPJob jobConf;
BSPJob localJobConf;
BSPTaskRunner runner;
volatile boolean done = false;
volatile boolean wasKilled = false;
private TaskStatus taskStatus;
private long startTime = 0L;
private volatile long lastPingedTimestamp = 0L;
private long startSuperstepCount = -1;
public TaskInProgress(Task task, BSPJob jobConf, String groomServer) {
this.task = task;
this.jobConf = jobConf;
this.localJobConf = null;
this.taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(), 0,
TaskStatus.State.UNASSIGNED, "init", groomServer,
TaskStatus.Phase.STARTING, task.getCounters());
}
public void markAsRecoveryTask(long superstepNumber) {
if (this.taskStatus.getRunState() != TaskStatus.State.FAILED) {
this.taskStatus.setRunState(TaskStatus.State.RECOVERING);
this.taskStatus.setPhase(TaskStatus.Phase.RECOVERING);
this.taskStatus.setStateString("recovering");
}
this.startSuperstepCount = superstepNumber;
}
private void localizeTask(Task task) throws IOException {
Path localJobFile = this.jobConf.getLocalPath(SUBDIR + "/"
+ task.getTaskID() + "/job.xml");
Path localJarFile = this.jobConf.getLocalPath(SUBDIR + "/"
+ task.getTaskID() + "/job.jar");
String jobFile = task.getJobFile();
FileSystem.get(conf).copyToLocalFile(new Path(jobFile), localJobFile);
task.setJobFile(localJobFile.toString());
localJobConf = new BSPJob(task.getJobID(), localJobFile.toString());
localJobConf.set("bsp.task.id", task.getTaskID().toString());
String jarFile = localJobConf.getJar();
if (jarFile != null) {
FileSystem.get(conf).copyToLocalFile(new Path(jarFile), localJarFile);
localJobConf.setJar(localJarFile.toString());
}
task.setConf(localJobConf);
}
public synchronized void setJobConf(BSPJob jobConf) {
this.jobConf = jobConf;
}
public synchronized BSPJob getJobConf() {
return localJobConf;
}
public void launchTask() throws IOException {
localizeTask(task);
taskStatus.setRunState(TaskStatus.State.RUNNING);
this.runner = task.createRunner(GroomServer.this);
this.runner.start();
startTime = Calendar.getInstance().getTimeInMillis();
LOG.info("Task '" + task.getTaskID().toString() + "' has started.");
}
/**
* Something went wrong and the task must be killed.
*/
public synchronized void killAndCleanup(boolean wasFailure)
throws IOException {
if (wasFailure) {
failures += 1;
taskStatus.setRunState(TaskStatus.State.FAILED);
} else {
taskStatus.setRunState(TaskStatus.State.KILLED);
}
// runner could be null if task-cleanup attempt is not localized yet
if (runner != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Killing process for " + this.task.getTaskID());
}
runner.killBsp();
}
runner = null;
}
public synchronized void killRunner() throws IOException {
if (runner != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Killing process for " + this.task.getTaskID());
}
runner.killBsp();
}
runner = null;
}
/**
*/
public Task getTask() {
return task;
}
/**
*/
public synchronized TaskStatus getStatus() {
return taskStatus;
}
/**
*/
public TaskStatus.State getRunState() {
return taskStatus.getRunState();
}
public boolean wasKilled() {
return wasKilled;
}
@Override
public boolean equals(Object obj) {
return (obj instanceof TaskInProgress)
&& task.getTaskID().equals(
((TaskInProgress) obj).getTask().getTaskID());
}
@Override
public int hashCode() {
return task.getTaskID().hashCode();
}
public void reportProgress(TaskStatus taskStatus) {
// LOG.info(task.getTaskID() + " " + taskStatus.getProgress() + "% "
// + taskStatus.getStateString());
if (this.done) {
LOG.info(task.getTaskID()
+ " Ignoring status-update since "
+ ((this.done) ? "task is 'done'" : ("runState: " + this.taskStatus
.getRunState())));
return;
}
this.taskStatus.statusUpdate(taskStatus);
}
public void reportDone() {
if (this.taskStatus.getRunState() != TaskStatus.State.FAILED) {
this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
}
this.taskStatus.setFinishTime(System.currentTimeMillis());
this.done = true;
this.runner.killBsp();
LOG.info("Task " + task.getTaskID() + " is done.");
}
public void jobHasFinished(boolean wasFailure) throws IOException {
// Kill the task if it is still running
synchronized (this) {
if (getRunState() == TaskStatus.State.RUNNING
|| getRunState() == TaskStatus.State.UNASSIGNED
|| getRunState() == TaskStatus.State.COMMIT_PENDING) {
killAndCleanup(wasFailure);
}
}
}
public synchronized void ping(long timestamp) {
this.lastPingedTimestamp = timestamp;
}
}
public boolean isRunning() {
return running;
}
public static GroomServer constructGroomServer(
Class<? extends GroomServer> groomServerClass, final Configuration conf2) {
try {
Constructor<? extends GroomServer> c = groomServerClass
.getConstructor(Configuration.class);
return c.newInstance(conf2);
} catch (Exception e) {
throw new RuntimeException("Failed construction of " + "Master: "
+ groomServerClass.toString(), e);
}
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
if (protocol.equals(GroomProtocol.class.getName())) {
return HamaRPCProtocolVersion.versionID;
} else if (protocol.equals(BSPPeerProtocol.class.getName())) {
return HamaRPCProtocolVersion.versionID;
} else {
throw new IOException("Unknown protocol to GroomServer: " + protocol);
}
}
/**
* Remove the tip and update all relevant state.
*
* @param tip {@link TaskInProgress} to be removed.
* @param wasFailure did the task fail or was it killed?
*/
private void purgeTask(TaskInProgress tip, boolean wasFailure)
throws IOException {
if (tip != null) {
LOG.info("About to purge task: " + tip.getTask().getTaskID());
// Remove the task from running jobs,
// removing the job if it's the last task
removeTaskFromJob(tip.getTask().getJobID(), tip);
tip.jobHasFinished(wasFailure);
}
}
private void removeTaskFromJob(BSPJobID jobId, TaskInProgress tip) {
synchronized (runningJobs) {
RunningJob rjob = runningJobs.get(jobId);
if (rjob == null) {
LOG.warn("Unknown job " + jobId + " being deleted.");
} else {
synchronized (rjob) {
rjob.tasks.remove(tip);
}
}
}
}
public String getGroomServerName() {
return this.groomServerName;
}
/**
* Get the list of tasks that will be reported back to the job tracker in the
* next heartbeat cycle.
*
* @return a copy of the list of TaskStatus objects
*/
public synchronized List<TaskStatus> getRunningTaskStatuses() {
List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
for (TaskInProgress tip : runningTasks.values()) {
result.add(tip.getStatus());
}
return result;
}
/**
* The main() for BSPPeer child processes.
*/
public static final class BSPPeerChild {
public static void main(String[] args) throws Throwable {
if (LOG.isDebugEnabled())
LOG.debug("BSPPeerChild starting");
final HamaConfiguration defaultConf = new HamaConfiguration();
// report address
String host = args[0];
int port = Integer.parseInt(args[1]);
InetSocketAddress address = new InetSocketAddress(host, port);
TaskAttemptID taskid = TaskAttemptID.forName(args[2]);
// //////////////////
BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, address,
defaultConf);
final BSPTask task = (BSPTask) umbilical.getTask(taskid);
int peerPort = umbilical.getAssignedPortNum(taskid);
defaultConf.addResource(new Path(task.getJobFile()));
BSPJob job = new BSPJob(task.getJobID(), task.getJobFile());
defaultConf.set(Constants.PEER_HOST, args[3]);
if (null != args && 5 == args.length) {
defaultConf.setInt("bsp.checkpoint.port", Integer.parseInt(args[4]));
}
defaultConf.setInt(Constants.PEER_PORT, peerPort);
long superstep = Long.parseLong(args[4]);
TaskStatus.State state = TaskStatus.State.valueOf(args[5]);
LOG.debug("Starting peer for sstep " + superstep + " state = " + state);
try {
// use job-specified working directory
FileSystem.get(job.getConfiguration()).setWorkingDirectory(
job.getWorkingDirectory());
// instantiate and init our peer
@SuppressWarnings("rawtypes")
final BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job,
defaultConf, taskid, umbilical, task.partition, task.splitClass,
task.split, task.getCounters(), superstep, state);
task.run(job, bspPeer, umbilical); // run the task
} catch (FSError e) {
LOG.fatal("FSError from child", e);
umbilical.fsError(taskid, e.getMessage());
e.printStackTrace();
} catch (SyncException e) {
LOG.fatal("SyncError from child", e);
umbilical.fatalError(taskid, e.toString());
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
e.printStackTrace(new PrintStream(baos));
e.printStackTrace();
} catch (Throwable throwable) {
LOG.fatal("Error running child", throwable);
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
throwable.printStackTrace(new PrintStream(baos));
throwable.printStackTrace();
} finally {
RPC.stopProxy(umbilical);
// Shutting down log4j of the child-vm...
// This assumes that on return from Task.run()
// there is no more logging done.
LogManager.shutdown();
}
}
}
@Override
public Task getTask(TaskAttemptID taskid) throws IOException {
TaskInProgress tip = tasks.get(taskid);
if (tip != null) {
return tip.getTask();
} else {
return null;
}
}
public TaskStatus getTaskStatus(TaskAttemptID taskid) {
TaskInProgress tip = tasks.get(taskid);
if (tip != null) {
return tip.getStatus();
} else {
return null;
}
}
public long getStartSuperstep(TaskAttemptID taskid) {
TaskInProgress tip = tasks.get(taskid);
if (tip != null) {
return tip.startSuperstepCount;
} else {
return -1L;
}
}
@Override
public boolean ping(TaskAttemptID taskid) throws IOException {
TaskInProgress tip = runningTasks.get(taskid);
if (tip != null) {
tip.ping(Calendar.getInstance().getTimeInMillis());
return true;
}
return false;
}
/**
* A child task had a fatal error. Kill the task.
*/
@Override
public void fatalError(TaskAttemptID taskId, String message)
throws IOException {
LOG.fatal("Task: " + taskId + " - Killed : " + message);
TaskInProgress tip = runningTasks.get(taskId);
purgeTask(tip, true);
}
@Override
public void fsError(TaskAttemptID taskId, String message) throws IOException {
LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
}
@Override
public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
TaskInProgress tip = tasks.get(taskId);
if (tip != null) {
tip.reportProgress(taskStatus);
return true;
} else {
LOG.warn("Progress from unknown child task: " + taskId);
return false;
}
}
@Override
public void done(TaskAttemptID taskid) throws IOException {
TaskInProgress tip = tasks.get(taskid);
if (tip != null) {
tip.reportDone();
} else {
LOG.warn("Unknown child task done: " + taskid + ". Ignored.");
}
}
@Override
public int getAssignedPortNum(TaskAttemptID taskid) {
return assignedPeerNames.get(taskid);
}
@Override
public void process(WatchedEvent event) {
}
}