Merged the trunk changes to branch HAMA-505-branch
git-svn-id: https://svn.apache.org/repos/asf/hama/branches/HAMA-505-branch@1369566 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/conf/hama-default.xml b/conf/hama-default.xml
index 2947745..ce36cf5 100644
--- a/conf/hama-default.xml
+++ b/conf/hama-default.xml
@@ -120,7 +120,12 @@
<description>The maximum number of BSP tasks that will be run simultaneously
by a groom server.</description>
</property>
- <property>
+ <property>
+ <name>bsp.ft.enabled</name>
+ <value>false</value>
+ <description>Enable Fault Tolerance in BSP Task execution.</description>
+ </property>
+ <property>
<name>bsp.checkpoint.enabled</name>
<value>false</value>
<description>Enable Hama to checkpoint the messages transferred among BSP tasks during the BSP synchronization period.</description>
diff --git a/core/src/main/java/org/apache/hama/Constants.java b/core/src/main/java/org/apache/hama/Constants.java
index bba8779..8b5cd36 100644
--- a/core/src/main/java/org/apache/hama/Constants.java
+++ b/core/src/main/java/org/apache/hama/Constants.java
@@ -60,6 +60,24 @@
public static final String UTF8_ENCODING = "UTF-8";
public static final String MAX_TASKS_PER_GROOM = "bsp.tasks.maximum";
+
+ public static final String MAX_TASK_ATTEMPTS = "bsp.tasks.max.attempts";
+
+ public static final int DEFAULT_MAX_TASK_ATTEMPTS = 2;
+
+ ////////////////////////////////////////
+ // Task scheduler related constants
+ // //////////////////////////////////////
+
+ public static final String TASK_ALLOCATOR_CLASS = "bsp.taskalloc.class";
+
+ // //////////////////////////////////////
+ // Fault tolerance related constants
+ // //////////////////////////////////////
+
+ public static final String FAULT_TOLERANCE_FLAG = "bsp.ft.enabled";
+
+ public static final String FAULT_TOLERANCE_CLASS = "bsp.ft.class";
// //////////////////////////////////////
// Checkpointing related constants
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
index 548e653..e4902a9 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
@@ -1025,7 +1025,7 @@
return tokens;
}
- static class RawSplit implements Writable {
+ public static class RawSplit implements Writable {
private String splitClass;
private BytesWritable bytes = new BytesWritable();
private String[] locations;
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPMaster.java b/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
index 4babda6..33741ae 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
@@ -28,12 +28,12 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,6 +48,8 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.MasterSyncClient;
+import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
import org.apache.hama.http.HttpServer;
import org.apache.hama.ipc.GroomProtocol;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
@@ -56,18 +58,23 @@
import org.apache.hama.monitor.fd.FDProvider;
import org.apache.hama.monitor.fd.Supervisor;
import org.apache.hama.monitor.fd.UDPSupervisor;
-import org.apache.hama.zookeeper.QuorumPeer;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
/**
* BSPMaster is responsible to control all the groom servers and to manage bsp
- * jobs.
+ * jobs. It has the following responsibilities:
+ * <ol>
+ * <li> <b>Job submission</b>. BSPMaster is responsible for accepting new job
+ * requests and assigning the job to scheduler for scheduling BSP Tasks defined
+ * for the job.
+ * <li> <b>GroomServer monitoring</b> BSPMaster keeps track of all the groom
+ * servers in the cluster. It is responsible for adding new grooms to the
+ * cluster and keeping a tab on all the grooms and could blacklist a groom if
+ * it get fails the availability requirement.
+ * <li> BSPMaster keeps track of all the task status for each job and handles
+ * the failure of job as requested by the jobs.
+ * </ol>
*/
public class BSPMaster implements JobSubmissionProtocol, MasterProtocol,
GroomServerManager, Watcher, MonitorManager {
@@ -77,8 +84,7 @@
private static final int FS_ACCESS_RETRY_PERIOD = 10000;
private HamaConfiguration conf;
- ZooKeeper zk = null;
- private String bspRoot = null;
+ MasterSyncClient syncClient = null;
/**
* Constants for BSPMaster's status.
@@ -122,7 +128,7 @@
// Jobs' Meta Data
private Integer nextJobId = Integer.valueOf(1);
- // clients
+ private int totalSubmissions = 0; // how many jobs has been submitted by clients
private int totalTasks = 0; // currnetly running tasks
private int totalTaskCapacity; // max tasks that groom server can run
@@ -141,6 +147,13 @@
private final AtomicReference<Supervisor> supervisor = new AtomicReference<Supervisor>();
+ /**
+ * ReportGroomStatusHandler keeps track of the status reported by each
+ * Groomservers on the task they are executing currently. Based on the
+ * status reported, it is responsible for issuing task recovery requests,
+ * updating the job progress and other book keeping on currently running
+ * jobs.
+ */
private class ReportGroomStatusHandler implements DirectiveHandler {
@Override
@@ -177,8 +190,13 @@
jip.getStatus().setProgress(ts.getSuperstepCount());
jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
} else if (ts.getRunState() == TaskStatus.State.FAILED) {
- jip.status.setRunState(JobStatus.FAILED);
- jip.failedTask(tip, ts);
+ if(jip.handleFailure(tip)){
+ recoverTask(jip);
+ }
+ else {
+ jip.status.setRunState(JobStatus.FAILED);
+ jip.failedTask(tip, ts);
+ }
}
if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
for (JobInProgressListener listener : jobInProgressListeners) {
@@ -192,6 +210,7 @@
jip.getStatus().setProgress(ts.getSuperstepCount());
jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
} else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
+
GroomProtocol worker = findGroomServer(tmpStatus);
Directive d1 = new DispatchTasksDirective(
new GroomServerAction[] { new KillTaskAction(ts.getTaskId()) });
@@ -444,11 +463,26 @@
return conf.getLocalPath("bsp.local.dir", pathString);
}
+ /**
+ * Starts the BSP Master process.
+ * @param conf The Hama configuration.
+ * @return an instance of BSPMaster
+ * @throws IOException
+ * @throws InterruptedException
+ */
public static BSPMaster startMaster(HamaConfiguration conf)
throws IOException, InterruptedException {
return startMaster(conf, generateNewIdentifier());
}
+ /**
+ * Starts the BSP Master process
+ * @param conf The Hama configuration
+ * @param identifier Identifier for the job.
+ * @return
+ * @throws IOException
+ * @throws InterruptedException
+ */
public static BSPMaster startMaster(HamaConfiguration conf, String identifier)
throws IOException, InterruptedException {
BSPMaster result = new BSPMaster(conf, identifier);
@@ -465,108 +499,20 @@
}
/**
- * When start the cluster, cleans all zk nodes up.
- *
- * @param conf
+ * Initialize the global synchronization client.
+ * @param conf Hama configuration.
*/
private void initZK(HamaConfiguration conf) {
- try {
- zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
- conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
- } catch (IOException e) {
- LOG.error("Exception during reinitialization!", e);
- }
-
- bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
- Constants.DEFAULT_ZOOKEEPER_ROOT);
- Stat s = null;
- if (zk != null) {
- try {
- s = zk.exists(bspRoot, false);
- } catch (Exception e) {
- LOG.error(s, e);
- }
-
- if (s == null) {
- try {
- zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- } catch (KeeperException e) {
- LOG.error(e);
- } catch (InterruptedException e) {
- LOG.error(e);
- }
- } else {
- this.clearZKNodes(zk);
- }
- }
+ this.syncClient = new ZKSyncBSPMasterClient();
+ this.syncClient.init(conf);
}
/**
- * Clears all sub-children of node bspRoot
+ * Get a handle of the global synchronization client used by BSPMaster.
+ * @return The synchronization client.
*/
- public void clearZKNodes(ZooKeeper zk) {
- clearZKNodes(zk, bspRoot);
- }
-
- public static void clearZKNodes(ZooKeeper zk, String pPath) {
- String path = pPath;
- if (!path.startsWith("/")) {
- path = "/" + path;
- LOG.warn("Path did not start with /, adding it: " + path);
- }
- try {
- Stat s = zk.exists(path, false);
- if (s != null) {
- clearZKNodesInternal(zk, path);
- }
-
- } catch (Exception e) {
- LOG.warn("Could not clear zookeeper nodes.", e);
- }
- }
-
- /**
- * Clears all sub-children of node rooted at path.
- */
- private static void clearZKNodesInternal(ZooKeeper zk, String path)
- throws KeeperException, InterruptedException {
- ArrayList<String> list = (ArrayList<String>) zk.getChildren(path, false);
-
- if (list.size() == 0) {
- return;
-
- } else {
- for (String node : list) {
- clearZKNodes(zk, path + "/" + node);
- zk.delete(path + "/" + node, -1); // delete any version of this node.
- }
- }
- }
-
- public void createJobRoot(String string) {
- try {
- zk.create("/" + string, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- } catch (KeeperException e) {
- LOG.error(e);
- } catch (InterruptedException e) {
- LOG.error(e);
- }
- }
-
- public void deleteJobRoot(String string) {
- try {
- for (String node : zk.getChildren("/" + string, this)) {
- zk.delete("/" + string + "/" + node, 0);
- }
-
- zk.delete("/" + string, 0);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- }
+ public MasterSyncClient getSyncClient(){
+ return this.syncClient;
}
/**
@@ -657,6 +603,11 @@
JobInProgress job = new JobInProgress(jobID, new Path(jobFile), this,
this.conf);
+ ++totalSubmissions;
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Submitting job number = " + totalSubmissions +
+ " id = " + job.getJobID());
+ }
return addJob(jobID, job);
}
@@ -804,6 +755,21 @@
}
return job.getStatus();
}
+
+ /**
+ * Recovers task in job. To be called when a particular task in a job has failed
+ * and there is a need to schedule it on a machine.
+ */
+ private synchronized void recoverTask(JobInProgress job) {
+ ++totalSubmissions;
+ for (JobInProgressListener listener : jobInProgressListeners) {
+ try {
+ listener.recoverTaskInJob(job);
+ } catch (IOException ioe) {
+ LOG.error("Fail to alter Scheduler a job is added.", ioe);
+ }
+ }
+ }
@Override
public JobStatus[] jobsToComplete() throws IOException {
@@ -920,11 +886,14 @@
}
}
+ /**
+ * Shuts down the BSP Process and does the necessary clean up.
+ */
public void shutdown() {
try {
- this.zk.close();
- } catch (InterruptedException e) {
- e.printStackTrace();
+ this.syncClient.close();
+ } catch (IOException e) {
+ LOG.error("Error closing the sync client",e);
}
if (null != this.supervisor.get()) {
this.supervisor.get().stop();
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java b/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
index 1c6745c..e7ba19e 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
@@ -19,14 +19,12 @@
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -35,10 +33,13 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.Counters.Counter;
+import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
+import org.apache.hama.bsp.ft.BSPFaultTolerantService;
+import org.apache.hama.bsp.ft.FaultTolerantPeerService;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.MessageManagerFactory;
import org.apache.hama.bsp.message.MessageQueue;
-import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
@@ -53,10 +54,7 @@
private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
public static enum PeerCounter {
- SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS,
- IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED,
- TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT,
- COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+ SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
}
private final Configuration conf;
@@ -71,13 +69,9 @@
private String[] allPeers;
// SYNC
- private SyncClient syncClient;
+ private PeerSyncClient syncClient;
private MessageManager<M> messenger;
- // A checkpoint is initiated at the <checkPointInterval>th interval.
- private int checkPointInterval;
- private long lastCheckPointStep;
-
// IO
private int partition;
private String splitClass;
@@ -92,6 +86,8 @@
private Counters counters;
private Combiner<M> combiner;
+ private FaultTolerantPeerService<M> faultToleranceService;
+
/**
* Protected default constructor for LocalBSPRunner.
*/
@@ -123,6 +119,13 @@
this.counters = counters;
}
+ public BSPPeerImpl(BSPJob job, Configuration conf, TaskAttemptID taskId,
+ BSPPeerProtocol umbilical, int partition, String splitClass,
+ BytesWritable split, Counters counters) throws Exception {
+ this(job, conf, taskId, umbilical, partition, splitClass, split, counters,
+ -1, TaskStatus.State.RUNNING);
+ }
+
/**
* BSPPeer Constructor.
*
@@ -136,7 +139,8 @@
@SuppressWarnings("unchecked")
public BSPPeerImpl(BSPJob job, Configuration conf, TaskAttemptID taskId,
BSPPeerProtocol umbilical, int partition, String splitClass,
- BytesWritable split, Counters counters) throws Exception {
+ BytesWritable split, Counters counters, long superstep,
+ TaskStatus.State state) throws Exception {
this.conf = conf;
this.taskId = taskId;
this.umbilical = umbilical;
@@ -149,28 +153,29 @@
this.fs = FileSystem.get(conf);
- this.checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL,
- Constants.DEFAULT_CHECKPOINT_INTERVAL);
- this.lastCheckPointStep = 0;
-
String bindAddress = conf.get(Constants.PEER_HOST,
Constants.DEFAULT_PEER_HOST);
int bindPort = conf
.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
peerAddress = new InetSocketAddress(bindAddress, bindPort);
- initialize();
- syncClient.register(taskId.getJobID(), taskId, peerAddress.getHostName(),
- peerAddress.getPort());
- // initial barrier syncing to get all the hosts to the same point, to get
- // consistent peernames.
- syncClient.enterBarrier(taskId.getJobID(), taskId, -1);
- syncClient.leaveBarrier(taskId.getJobID(), taskId, -1);
- setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f,
- TaskStatus.State.RUNNING, "running", peerAddress.getHostName(),
- TaskStatus.Phase.STARTING, counters));
- messenger = MessageManagerFactory.getMessageManager(conf);
- messenger.init(taskId, this, conf, peerAddress);
+ initializeIO();
+ initializeSyncService(superstep, state);
+
+ TaskStatus.Phase phase = TaskStatus.Phase.STARTING;
+ String stateString = "running";
+ if (state == TaskStatus.State.RECOVERING) {
+ phase = TaskStatus.Phase.RECOVERING;
+ stateString = "recovering";
+ }
+
+ setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f, state,
+ stateString, peerAddress.getHostName(), phase, counters));
+
+ initilizeMessaging();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initialized Messaging service.");
+ }
final String combinerName = conf.get("bsp.combiner.class");
if (combinerName != null) {
@@ -178,12 +183,57 @@
conf.getClassByName(combinerName), conf);
}
+ if (conf.getBoolean(Constants.FAULT_TOLERANCE_FLAG, false)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fault tolerance enabled.");
+ }
+ if (superstep > 0)
+ conf.setInt("attempt.superstep", (int) superstep);
+ Class<?> ftClass = conf.getClass(Constants.FAULT_TOLERANCE_CLASS,
+ AsyncRcvdMsgCheckpointImpl.class, BSPFaultTolerantService.class);
+ if (ftClass != null) {
+ if (superstep > 0) {
+ counters.incrCounter(PeerCounter.SUPERSTEP_SUM, superstep);
+ }
+
+ this.faultToleranceService = ((BSPFaultTolerantService<M>) ReflectionUtils
+ .newInstance(ftClass, null)).constructPeerFaultTolerance(job, this,
+ syncClient, peerAddress, this.taskId, superstep, conf, messenger);
+ TaskStatus.State newState = this.faultToleranceService
+ .onPeerInitialized(state);
+
+ if (state == TaskStatus.State.RECOVERING) {
+ if (newState == TaskStatus.State.RUNNING) {
+ phase = TaskStatus.Phase.STARTING;
+ stateString = "running";
+ state = newState;
+ }
+
+ setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f,
+ state, stateString, peerAddress.getHostName(), phase, counters));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("State after FT service initialization - "
+ + newState.toString());
+ }
+
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initialized fault tolerance service");
+ }
+ }
+ }
+ doFirstSync(superstep);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.info(new StringBuffer("BSP Peer successfully initialized for ")
+ .append(this.taskId.toString()).append(" ").append(superstep)
+ .toString());
+ }
}
@SuppressWarnings("unchecked")
public final void initialize() throws Exception {
- syncClient = SyncServiceFactory.getSyncClient(conf);
- syncClient.init(conf, taskId.getJobID(), taskId);
initInput();
@@ -235,6 +285,50 @@
}
}
+ public final void initilizeMessaging() throws ClassNotFoundException {
+ messenger = MessageManagerFactory.getMessageManager(conf);
+ messenger.init(taskId, this, conf, peerAddress);
+ }
+
+ public final void initializeSyncService(long superstep, TaskStatus.State state)
+ throws Exception {
+
+ syncClient = SyncServiceFactory.getPeerSyncClient(conf);
+ syncClient.init(conf, taskId.getJobID(), taskId);
+ syncClient.register(taskId.getJobID(), taskId, peerAddress.getHostName(),
+ peerAddress.getPort());
+ }
+
+ private void doFirstSync(long superstep) throws SyncException {
+ if (superstep > 0)
+ --superstep;
+ syncClient.enterBarrier(taskId.getJobID(), taskId, superstep);
+ syncClient.leaveBarrier(taskId.getJobID(), taskId, superstep);
+ }
+
+ @SuppressWarnings("unchecked")
+ public final void initializeIO() throws Exception {
+
+ initInput();
+
+ String outdir = null;
+ if (conf.get("bsp.output.dir") != null) {
+ Path outputDir = new Path(conf.get("bsp.output.dir",
+ "tmp-" + System.currentTimeMillis()), Task.getOutputName(partition));
+ outdir = outputDir.makeQualified(fs).toString();
+ }
+ outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob, outdir);
+ final RecordWriter<K2, V2> finalOut = outWriter;
+
+ collector = new OutputCollector<K2, V2>() {
+ @Override
+ public void collect(K2 key, V2 value) throws IOException {
+ finalOut.write(key, value);
+ }
+ };
+
+ }
+
@Override
public final M getCurrentMessage() throws IOException {
return messenger.getCurrentMessage();
@@ -247,85 +341,50 @@
}
/*
- * returns true if the peer would checkpoint in the next sync.
- */
- public final boolean isReadyToCheckpoint() {
-
- checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL, 1);
- if (LOG.isDebugEnabled())
- LOG.debug(new StringBuffer(1000).append("Enabled = ")
- .append(conf.getBoolean(Constants.CHECKPOINT_ENABLED, false))
- .append(" checkPointInterval = ").append(checkPointInterval)
- .append(" lastCheckPointStep = ").append(lastCheckPointStep)
- .append(" getSuperstepCount() = ").append(getSuperstepCount())
- .toString());
-
- return (conf.getBoolean(Constants.CHECKPOINT_ENABLED, false)
- && (checkPointInterval != 0) && (((int) (getSuperstepCount() - lastCheckPointStep)) >= checkPointInterval));
-
- }
-
- private final String checkpointedPath() {
- String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
- String ckptPath = backup + bspJob.getJobID().toString() + "/"
- + getSuperstepCount() + "/" + this.taskId.toString();
- if (LOG.isDebugEnabled())
- LOG.debug("Messages are to be saved to " + ckptPath);
- return ckptPath;
- }
-
- final void checkpoint(String checkpointedPath, BSPMessageBundle<M> bundle) {
- FSDataOutputStream out = null;
- try {
- out = this.fs.create(new Path(checkpointedPath));
- bundle.write(out);
- } catch (IOException ioe) {
- LOG.warn("Fail checkpointing messages to " + checkpointedPath, ioe);
- } finally {
- try {
- if (null != out)
- out.close();
- } catch (IOException e) {
- LOG.warn("Fail to close dfs output stream while checkpointing.", e);
- }
- }
- }
-
- /*
* (non-Javadoc)
* @see org.apache.hama.bsp.BSPPeerInterface#sync()
*/
@Override
public final void sync() throws IOException, SyncException,
InterruptedException {
- long startBarrier = System.currentTimeMillis();
- enterBarrier();
+
// normally all messages should been send now, finalizing the send phase
messenger.finishSendPhase();
Iterator<Entry<InetSocketAddress, MessageQueue<M>>> it = messenger
.getMessageIterator();
- boolean shouldCheckPoint;
-
- if ((shouldCheckPoint = isReadyToCheckpoint())) {
- lastCheckPointStep = getSuperstepCount();
- }
-
while (it.hasNext()) {
Entry<InetSocketAddress, MessageQueue<M>> entry = it.next();
final InetSocketAddress addr = entry.getKey();
final Iterable<M> messages = entry.getValue();
final BSPMessageBundle<M> bundle = combineMessages(messages);
-
- if (shouldCheckPoint) {
- checkpoint(checkpointedPath(), bundle);
- }
-
// remove this message during runtime to save a bit of memory
it.remove();
+ try {
+ messenger.transfer(addr, bundle);
+ } catch (Exception e) {
+ LOG.error("Error while sending messages", e);
+ }
+ }
- messenger.transfer(addr, bundle);
+ if (this.faultToleranceService != null) {
+ try {
+ this.faultToleranceService.beforeBarrier();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ long startBarrier = System.currentTimeMillis();
+ enterBarrier();
+
+ if (this.faultToleranceService != null) {
+ try {
+ this.faultToleranceService.duringBarrier();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
leaveBarrier();
@@ -336,9 +395,38 @@
currentTaskStatus.setCounters(counters);
+ if (this.faultToleranceService != null) {
+ try {
+ this.faultToleranceService.afterBarrier();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
umbilical.statusUpdate(taskId, currentTaskStatus);
// Clear outgoing queues.
messenger.clearOutgoingQueues();
+
+ // int msgsCount = -1;
+ // if (shouldCheckPoint) {
+ // msgsCount = checkpointReceivedMessages(checkpointedReceivePath());
+ // }
+ //
+ // this.syncClient.storeInformation(this.syncClient.constructKey(
+ // this.bspJob.getJobID(), "checkpoint", String.valueOf(getPeerIndex())),
+ // new IntWritable(msgsCount), false, null);
+
+ // if (msgsCount >= 0) {
+ // ArrayWritable writableArray = new ArrayWritable(IntWritable.class);
+ // Writable[] writeArr = new Writable[2];
+ // writeArr[0] = new IntWritable((int) getSuperstepCount());
+ // writeArr[1] = new IntWritable(msgsCount);
+ // writableArray.set(writeArr);
+ // this.syncClient.storeInformation(
+ // this.syncClient.constructKey(this.bspJob.getJobID(), "checkpoint",
+ // String.valueOf(getPeerIndex())), writableArray, true, null);
+ // }
+
}
private final BSPMessageBundle<M> combineMessages(Iterable<M> messages) {
@@ -420,8 +508,7 @@
@Override
public int getPeerIndex() {
- initPeerNames();
- return Arrays.binarySearch(getAllPeerNames(), getPeerName());
+ return this.taskId.getTaskID().getId();
}
@Override
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPTask.java b/core/src/main/java/org/apache/hama/bsp/BSPTask.java
index 9212317..679d053 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPTask.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPTask.java
@@ -69,7 +69,7 @@
boolean shouldKillSelf = false;
try {
if (LOG.isDebugEnabled())
- LOG.debug("Pinging at time " + Calendar.getInstance().toString());
+ LOG.debug("Pinging at time " + Calendar.getInstance().getTimeInMillis());
// if the RPC call returns false, it means that groomserver does not
// have knowledge of this task.
shouldKillSelf = !(pingRPC.ping(taskId) && bspThread.isAlive());
diff --git a/core/src/main/java/org/apache/hama/bsp/GroomServer.java b/core/src/main/java/org/apache/hama/bsp/GroomServer.java
index c63c8d6..5ba7b1e 100644
--- a/core/src/main/java/org/apache/hama/bsp/GroomServer.java
+++ b/core/src/main/java/org/apache/hama/bsp/GroomServer.java
@@ -163,19 +163,20 @@
}
if (actions != null) {
- assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
+ // assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
int prevPort = Constants.DEFAULT_PEER_PORT;
for (GroomServerAction action : actions) {
if (action instanceof LaunchTaskAction) {
Task t = ((LaunchTaskAction) action).getTask();
- prevPort = BSPNetUtils.getNextAvailable(prevPort);
- assignedPeerNames.put(t.getTaskID(), prevPort);
-
+ synchronized (assignedPeerNames) {
+ prevPort = BSPNetUtils.getNextAvailable(prevPort);
+ assignedPeerNames.put(t.getTaskID(), prevPort);
+ }
LOG.info("Launch " + actions.length + " tasks.");
startNewTask((LaunchTaskAction) action);
- } else {
+ } else if (action instanceof KillTaskAction) {
// TODO Use the cleanup thread
// tasksToCleanup.put(action);
@@ -187,11 +188,30 @@
tip.taskStatus.setRunState(TaskStatus.State.FAILED);
try {
tip.killAndCleanup(false);
+ tasks.remove(killAction.getTaskID());
+ runningTasks.remove(killAction.getTaskID());
} catch (IOException ioe) {
throw new DirectiveException("Error when killing a "
+ "TaskInProgress.", ioe);
}
}
+ } else if (action instanceof RecoverTaskAction) {
+ LOG.info("Recovery action task.");
+ RecoverTaskAction recoverAction = (RecoverTaskAction) action;
+ Task t = recoverAction.getTask();
+ LOG.info("Recovery action task." + t.getTaskID());
+ synchronized (assignedPeerNames) {
+ prevPort = BSPNetUtils.getNextAvailable(prevPort);
+ assignedPeerNames.put(t.getTaskID(), prevPort);
+ }
+ try {
+ startRecoveryTask(recoverAction);
+ } catch (IOException e) {
+ throw new DirectiveException(
+ new StringBuffer().append("Error starting the recovery task")
+ .append(t.getTaskID()).toString(),
+ e);
+ }
}
}
}
@@ -321,6 +341,8 @@
this.conf.set(Constants.PEER_HOST, localHostname);
this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
this.maxCurrentTasks = conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3);
+ this.assignedPeerNames = new HashMap<TaskAttemptID, Integer>(
+ 2 * this.maxCurrentTasks);
int rpcPort = -1;
String rpcAddr = null;
@@ -571,6 +593,61 @@
}
}
+ private void startRecoveryTask(RecoverTaskAction action) throws IOException {
+ Task t = action.getTask();
+ BSPJob jobConf = null;
+ try {
+ jobConf = new BSPJob(t.getJobID(), t.getJobFile());
+ } catch (IOException e1) {
+ LOG.error(e1);
+ throw e1;
+ }
+
+ TaskInProgress tip = new TaskInProgress(t, jobConf, this.groomServerName);
+ tip.markAsRecoveryTask(action.getSuperstepCount());
+ synchronized (this) {
+ if (tasks.containsKey(t.getTaskID())) {
+ TaskInProgress oldTip = tasks.get(t.getTaskID());
+ try {
+ oldTip.killRunner();
+ } catch (IOException e) {
+ LOG.error("Error killing the current process for " + t.getTaskID(), e);
+ throw e;
+ }
+ }
+
+ Iterator<TaskAttemptID> taskIterator = tasks.keySet().iterator();
+ while(taskIterator.hasNext()){
+ TaskAttemptID taskAttId = taskIterator.next();
+ if(taskAttId.getTaskID().equals(t.getTaskID().getTaskID())){
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Removing tasks with id = " + t.getTaskID().getTaskID());
+ }
+ taskIterator.remove();
+ runningTasks.remove(taskAttId);
+ }
+ }
+
+ tasks.put(t.getTaskID(), tip);
+ runningTasks.put(t.getTaskID(), tip);
+ }
+ try {
+ localizeJob(tip);
+ } catch (Throwable e) {
+ String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
+ .stringifyException(e));
+ LOG.warn(msg);
+
+ try {
+ tip.killAndCleanup(true);
+ } catch (IOException ie2) {
+ LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n"
+ + StringUtils.stringifyException(ie2));
+ }
+ throw new IOException("Errro localizing the job.",e);
+ }
+ }
+
/**
* Update and report refresh status back to BSPMaster.
*/
@@ -730,13 +807,20 @@
+ " monitorPeriod = "
+ monitorPeriod
+ " check = "
- + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
+ + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) &&
+ (((tip.lastPingedTimestamp == 0 &&
+ ((currentTime - tip.startTime) > 10 * monitorPeriod)) ||
+ ((tip.lastPingedTimestamp > 0) &&
+ (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
// Task is out of contact if it has not pinged since more than
// monitorPeriod. A task is given a leeway of 10 times monitorPeriod
// to get started.
if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
- && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))) {
+ && (((tip.lastPingedTimestamp == 0
+ && ((currentTime - tip.startTime) > 10 * monitorPeriod))
+ || ((tip.lastPingedTimestamp > 0)
+ && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))) {
LOG.info("adding purge task: " + tip.getTask().getTaskID());
@@ -891,6 +975,7 @@
private long startTime = 0L;
private volatile long lastPingedTimestamp = 0L;
+ private long startSuperstepCount = -1;
public TaskInProgress(Task task, BSPJob jobConf, String groomServer) {
this.task = task;
@@ -901,6 +986,15 @@
TaskStatus.Phase.STARTING, task.getCounters());
}
+ public void markAsRecoveryTask(long superstepNumber) {
+ if (this.taskStatus.getRunState() != TaskStatus.State.FAILED) {
+ this.taskStatus.setRunState(TaskStatus.State.RECOVERING);
+ this.taskStatus.setPhase(TaskStatus.Phase.RECOVERING);
+ this.taskStatus.setStateString("recovering");
+ }
+ this.startSuperstepCount = superstepNumber;
+ }
+
private void localizeTask(Task task) throws IOException {
Path localJobFile = this.jobConf.getLocalPath(SUBDIR + "/"
+ task.getTaskID() + "/job.xml");
@@ -954,8 +1048,22 @@
// runner could be null if task-cleanup attempt is not localized yet
if (runner != null) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Killing process for " + this.task.getTaskID());
+ }
runner.killBsp();
}
+ runner = null;
+ }
+
+ public synchronized void killRunner() throws IOException {
+ if (runner != null) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Killing process for " + this.task.getTaskID());
+ }
+ runner.killBsp();
+ }
+ runner = null;
}
/**
@@ -1143,6 +1251,11 @@
defaultConf.setInt("bsp.checkpoint.port", Integer.parseInt(args[4]));
}
defaultConf.setInt(Constants.PEER_PORT, peerPort);
+
+ long superstep = Long.parseLong(args[4]);
+ TaskStatus.State state = TaskStatus.State.valueOf(args[5]);
+ LOG.debug("Starting peer for sstep " + superstep + " state = " + state);
+
try {
// use job-specified working directory
@@ -1153,7 +1266,7 @@
@SuppressWarnings("rawtypes")
final BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job,
defaultConf, taskid, umbilical, task.partition, task.splitClass,
- task.split, task.getCounters());
+ task.split, task.getCounters(), superstep, state);
task.run(job, bspPeer, umbilical); // run the task
@@ -1195,6 +1308,24 @@
}
}
+ public TaskStatus getTaskStatus(TaskAttemptID taskid) {
+ TaskInProgress tip = tasks.get(taskid);
+ if (tip != null) {
+ return tip.getStatus();
+ } else {
+ return null;
+ }
+ }
+
+ public long getStartSuperstep(TaskAttemptID taskid) {
+ TaskInProgress tip = tasks.get(taskid);
+ if (tip != null) {
+ return tip.startSuperstepCount;
+ } else {
+ return -1L;
+ }
+ }
+
@Override
public boolean ping(TaskAttemptID taskid) throws IOException {
TaskInProgress tip = runningTasks.get(taskid);
@@ -1220,8 +1351,6 @@
@Override
public void fsError(TaskAttemptID taskId, String message) throws IOException {
LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
- // TODO
-
}
@Override
@@ -1254,8 +1383,6 @@
@Override
public void process(WatchedEvent event) {
- // TODO Auto-generated method stub
-
}
}
diff --git a/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java b/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
index bdc9857..658a4a2 100644
--- a/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
+++ b/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
@@ -28,7 +28,7 @@
* A generic directive from the {@link org.apache.hama.bsp.BSPMaster} to the
* {@link org.apache.hama.bsp.GroomServer} to take some 'action'.
*/
-abstract class GroomServerAction implements Writable {
+public abstract class GroomServerAction implements Writable {
/**
* Ennumeration of various 'actions' that the {@link BSPMaster} directs the
@@ -49,7 +49,13 @@
REINIT_GROOM,
/** Ask a task to save its output. */
- COMMIT_TASK
+ COMMIT_TASK,
+
+ /** Recover a task from failure. */
+ RECOVER_TASK,
+
+ /** Update information on a peer. */
+ UPDATE_PEER
};
/**
@@ -73,7 +79,17 @@
case KILL_JOB: {
action = new KillJobAction();
}
- break;
+ break;
+ case RECOVER_TASK:
+ {
+ action = new RecoverTaskAction();
+ }
+ break;
+ case UPDATE_PEER:
+ {
+ action = new UpdatePeerAction();
+ }
+ break;
case REINIT_GROOM: {
action = new ReinitGroomAction();
}
diff --git a/core/src/main/java/org/apache/hama/bsp/JobInProgress.java b/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
index 4da7d90..3630429 100644
--- a/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
+++ b/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
@@ -21,9 +21,11 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,13 +34,21 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hama.Constants;
+import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
+import org.apache.hama.bsp.ft.BSPFaultTolerantService;
+import org.apache.hama.bsp.ft.FaultTolerantMasterService;
+import org.apache.hama.bsp.sync.MasterSyncClient;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.BestEffortDataLocalTaskAllocator;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+import org.apache.hama.util.ReflectionUtils;
/**
* JobInProgress maintains all the info for keeping a Job on the straight and
* narrow. It keeps its JobProfile and its latest JobStatus, plus a set of
* tables for doing bookkeeping of its Tasks.ss
*/
-class JobInProgress {
+public class JobInProgress {
/**
* Used when the a kill is issued to a job which is initializing.
@@ -73,6 +83,8 @@
long launchTime;
long finishTime;
+ int maxTaskAttempts;
+
private String jobName;
// private LocalFileSystem localFs;
@@ -88,11 +100,38 @@
String jobSplit;
Map<Task, GroomServerStatus> taskToGroomMap;
+
// Used only for scheduling!
- Map<GroomServerStatus, Integer> tasksInGroomMap;
+ Map<GroomServerStatus, Integer> taskCountInGroomMap;
+
+ // If the task does not exist as key, it implies that the task did not fail
+ // before.
+ // Value in the map implies the attempt ID for which the key(task) was
+ // re-attempted before.
+ Map<Task, Integer> taskReattemptMap;
+
+ Set<TaskInProgress> recoveryTasks;
+
+ // This set keeps track of the tasks that have failed.
+ Set<Task> failedTasksTillNow;
private int taskCompletionEventTracker = 0;
+ private TaskAllocationStrategy taskAllocationStrategy;
+
+ private FaultTolerantMasterService faultToleranceService;
+
+ /**
+ * Used only for unit tests.
+ * @param jobId
+ * @param conf
+ */
+ public JobInProgress(BSPJobID jobId, Configuration conf){
+ this.conf = conf;
+ this.jobId = jobId;
+ master = null;
+ }
+
public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master,
Configuration conf) throws IOException {
this.conf = conf;
@@ -101,10 +140,6 @@
this.jobFile = jobFile;
this.master = master;
- this.taskToGroomMap = new HashMap<Task, GroomServerStatus>(2 * tasks.length);
-
- this.tasksInGroomMap = new HashMap<GroomServerStatus, Integer>();
-
this.status = new JobStatus(jobId, null, 0L, 0L,
JobStatus.State.PREP.value(), counters);
this.startTime = System.currentTimeMillis();
@@ -126,6 +161,9 @@
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
numBSPTasks + 10);
+ this.maxTaskAttempts = job.getConf().getInt(Constants.MAX_TASK_ATTEMPTS,
+ Constants.DEFAULT_MAX_TASK_ATTEMPTS);
+
this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
job.getJobName());
@@ -139,6 +177,8 @@
fs.copyToLocalFile(new Path(jarFile), localJarFile);
}
+ failedTasksTillNow = new HashSet<Task>(2 * tasks.length);
+
}
public JobProfile getProfile() {
@@ -228,25 +268,55 @@
this.tasks = new TaskInProgress[numBSPTasks];
for (int i = 0; i < numBSPTasks; i++) {
tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
- splits[i], this.master, this.conf, this, i);
+ splits[i], this.conf, this, i);
}
} else {
this.tasks = new TaskInProgress[numBSPTasks];
for (int i = 0; i < numBSPTasks; i++) {
tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
- null, this.master, this.conf, this, i);
+ null, this.conf, this, i);
}
}
+ this.taskToGroomMap = new HashMap<Task, GroomServerStatus>(2 * tasks.length);
+
+ this.taskCountInGroomMap = new HashMap<GroomServerStatus, Integer>();
+
+ this.recoveryTasks = new HashSet<TaskInProgress>(2 * tasks.length);
// Update job status
this.status = new JobStatus(this.status.getJobID(), this.profile.getUser(),
0L, 0L, JobStatus.RUNNING, counters);
// delete all nodes belonging to that job before start
- BSPMaster.clearZKNodes(master.zk, this.getJobID().toString());
- master.createJobRoot(this.getJobID().toString());
+ MasterSyncClient syncClient = master.getSyncClient();
+ syncClient.registerJob(this.getJobID().toString());
tasksInited = true;
+
+ Class<?> taskAllocatorClass = conf.getClass(Constants.TASK_ALLOCATOR_CLASS,
+ BestEffortDataLocalTaskAllocator.class, TaskAllocationStrategy.class);
+ this.taskAllocationStrategy = (TaskAllocationStrategy) ReflectionUtils
+ .newInstance(taskAllocatorClass, new Object[0]);
+
+ if (conf.getBoolean(Constants.FAULT_TOLERANCE_FLAG, false)) {
+
+ Class<?> ftClass = conf.getClass(Constants.FAULT_TOLERANCE_CLASS,
+ AsyncRcvdMsgCheckpointImpl.class ,
+ BSPFaultTolerantService.class);
+ if (ftClass != null) {
+ try {
+ faultToleranceService = ((BSPFaultTolerantService<?>) ReflectionUtils
+ .newInstance(ftClass, new Object[0]))
+ .constructMasterFaultTolerance(jobId, maxTaskAttempts, tasks,
+ conf, master.getSyncClient(), taskAllocationStrategy);
+ LOG.info("Initialized fault tolerance service with "
+ + ftClass.getCanonicalName());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
LOG.info("Job is initialized.");
}
@@ -268,29 +338,54 @@
}
Task result = null;
+ BSPResource[] resources = new BSPResource[0];
- try {
- for (int i = 0; i < tasks.length; i++) {
- if (!tasks[i].isRunning() && !tasks[i].isComplete()) {
- result = tasks[i].getTaskToRun(groomStatuses, tasksInGroomMap);
- if (result != null)
- this.taskToGroomMap.put(result, tasks[i].getGroomServerStatus());
- int taskInGroom = 0;
- if (tasksInGroomMap.containsKey(tasks[i].getGroomServerStatus())) {
- taskInGroom = tasksInGroomMap.get(tasks[i].getGroomServerStatus());
- }
- tasksInGroomMap.put(tasks[i].getGroomServerStatus(), taskInGroom + 1);
- break;
+ for (int i = 0; i < tasks.length; i++) {
+ if (!tasks[i].isRunning() && !tasks[i].isComplete()) {
+
+ String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
+ groomStatuses, taskCountInGroomMap, resources, tasks[i]);
+ GroomServerStatus groomStatus = taskAllocationStrategy
+ .getGroomToAllocate(groomStatuses, selectedGrooms,
+ taskCountInGroomMap, resources, tasks[i]);
+ if (groomStatus != null)
+ result = tasks[i].constructTask(groomStatus);
+ if (result != null) {
+ updateGroomTaskDetails(tasks[i].getGroomServerStatus(), result);
}
+ break;
}
-
- } catch (IOException e) {
- LOG.error("Exception while obtaining new task!", e);
}
+
counters.incrCounter(JobCounter.LAUNCHED_TASKS, 1L);
return result;
}
+ public void recoverTasks(Map<String, GroomServerStatus> groomStatuses,
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+ throws IOException {
+
+ if (this.faultToleranceService == null)
+ return;
+
+ try {
+ this.faultToleranceService.recoverTasks(this, groomStatuses,
+ fetchAndClearTasksToRecover(), tasks, taskCountInGroomMap, actionMap);
+ } catch (IOException e) {
+ throw e;
+ }
+ }
+
+ private void updateGroomTaskDetails(GroomServerStatus groomStatus, Task task) {
+ taskToGroomMap.put(task, groomStatus);
+ int tasksInGroom = 0;
+
+ if (taskCountInGroomMap.containsKey(groomStatus)) {
+ tasksInGroom = taskCountInGroomMap.get(groomStatus);
+ }
+ taskCountInGroomMap.put(groomStatus, tasksInGroom + 1);
+ }
+
/**
* Hosts that tasks run on.
*
@@ -304,6 +399,14 @@
return list;
}
+ /**
+ * Mark the completed task status. If all the tasks are completed the status
+ * of the job is updated to notify the client on the completion of the whole
+ * job.
+ *
+ * @param tip <code>TaskInProgress</code> object representing task.
+ * @param status The completed task status
+ */
public synchronized void completedTask(TaskInProgress tip, TaskStatus status) {
TaskAttemptID taskid = status.getTaskId();
updateTaskStatus(tip, status);
@@ -332,12 +435,18 @@
LOG.info("Job successfully done.");
// delete job root
- master.deleteJobRoot(this.getJobID().toString());
+ master.getSyncClient().deregisterJob(this.getJobID().toString());
garbageCollect();
}
}
+ /**
+ * Mark failure of a task.
+ *
+ * @param tip <code>TaskInProgress</code> object representing task.
+ * @param status The failed task status
+ */
public void failedTask(TaskInProgress tip, TaskStatus status) {
TaskAttemptID taskid = status.getTaskId();
updateTaskStatus(tip, status);
@@ -353,8 +462,6 @@
}
}
- // TODO
-
if (!allDone) {
// Kill job
this.kill();
@@ -371,6 +478,12 @@
}
}
+ /**
+ * Updates the task status of the task.
+ *
+ * @param tip <code>TaskInProgress</code> representing task
+ * @param taskStatus The status of the task.
+ */
public synchronized void updateTaskStatus(TaskInProgress tip,
TaskStatus taskStatus) {
TaskAttemptID taskid = taskStatus.getTaskId();
@@ -414,6 +527,9 @@
}
}
+ /**
+ * Kill the job.
+ */
public synchronized void kill() {
if (status.getRunState() != JobStatus.KILLED) {
this.status = new JobStatus(status.getJobID(), this.profile.getUser(),
@@ -438,6 +554,12 @@
*/
synchronized void garbageCollect() {
try {
+
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Removing " + localJobFile + " and " + localJarFile
+ + " getJobFile = " + profile.getJobFile());
+ }
+
// Definitely remove the local-disk copy of the job file
if (localJobFile != null) {
localFs.delete(localJobFile, true);
@@ -501,4 +623,62 @@
return events;
}
+ /**
+ * Returns the configured maximum number of times the task could be
+ * re-attempted.
+ */
+ int getMaximumReAttempts() {
+ return maxTaskAttempts;
+ }
+
+ /**
+ * Returns true if the task should be restarted on failure. It also causes
+ * JobInProgress object to maintain state of the restart request.
+ */
+ synchronized boolean handleFailure(TaskInProgress tip) {
+ if (this.faultToleranceService == null
+ || (!faultToleranceService.isRecoveryPossible(tip)))
+ return false;
+
+ if (!faultToleranceService.isAlreadyRecovered(tip)) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Adding recovery task " + tip.getCurrentTaskAttemptId());
+ }
+ recoveryTasks.add(tip);
+ status.setRunState(JobStatus.RECOVERING);
+ return true;
+ }
+ else if(LOG.isDebugEnabled()){
+ LOG.debug("Avoiding recovery task " + tip.getCurrentTaskAttemptId());
+ }
+ return false;
+
+ }
+
+
+ /**
+ *
+ * @return Returns the list of tasks in progress that has to be recovered.
+ */
+ synchronized TaskInProgress[] fetchAndClearTasksToRecover() {
+ TaskInProgress[] failedTasksInProgress = new TaskInProgress[recoveryTasks
+ .size()];
+ recoveryTasks.toArray(failedTasksInProgress);
+
+ recoveryTasks.clear();
+ return failedTasksInProgress;
+ }
+
+ public boolean isRecoveryPending() {
+ return recoveryTasks.size() != 0;
+ }
+
+ public Set<Task> getTaskSet() {
+ return taskToGroomMap.keySet();
+ }
+
+ public FaultTolerantMasterService getFaultToleranceService() {
+ return this.faultToleranceService;
+ }
+
}
diff --git a/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java b/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
index f5db6e4..746aa89 100644
--- a/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
+++ b/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
@@ -40,5 +40,13 @@
* @throws IOException
*/
public abstract void jobRemoved(JobInProgress job) throws IOException;
+
+ /**
+ * Invoked when a task in job has to be recovered by {@link BSPMaster}.
+ * @param job The job to which the task belongs to.
+ * @param task that has to be recovered
+ * @throws IOException
+ */
+ public abstract void recoverTaskInJob(JobInProgress job) throws IOException;
}
diff --git a/core/src/main/java/org/apache/hama/bsp/JobStatus.java b/core/src/main/java/org/apache/hama/bsp/JobStatus.java
index 1382770..c0f7e8a 100644
--- a/core/src/main/java/org/apache/hama/bsp/JobStatus.java
+++ b/core/src/main/java/org/apache/hama/bsp/JobStatus.java
@@ -44,7 +44,7 @@
}
public static enum State {
- RUNNING(1), SUCCEEDED(2), FAILED(3), PREP(4), KILLED(5);
+ RUNNING(1), SUCCEEDED(2), FAILED(3), PREP(4), KILLED(5), RECOVERING(6);
int s;
State(int s) {
@@ -74,6 +74,9 @@
case KILLED:
name = "KILLED";
break;
+ case RECOVERING:
+ name = "RECOVERING";
+ break;
}
return name;
@@ -86,6 +89,7 @@
public static final int FAILED = 3;
public static final int PREP = 4;
public static final int KILLED = 5;
+ public static final int RECOVERING = 6;
private BSPJobID jobid;
private long progress;
diff --git a/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java b/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java
index 0f13efb..b99e6f4 100644
--- a/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java
+++ b/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java
@@ -23,7 +23,7 @@
/**
* Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
- * {@link org.apache.hama.bsp.GroomServer} to launch a new task.
+ * {@link org.apache.hama.bsp.GroomServer} to launch a recovery task.
*/
class LaunchTaskAction extends GroomServerAction {
private Task task;
diff --git a/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java b/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
index 328a940..97113ec 100644
--- a/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
+++ b/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
@@ -44,7 +44,10 @@
import org.apache.hama.bsp.message.AbstractMessageManager;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.MessageManagerFactory;
+import org.apache.hama.bsp.sync.BSPPeerSyncClient;
import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.sync.SyncEvent;
+import org.apache.hama.bsp.sync.SyncEventListener;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
@@ -118,7 +121,7 @@
conf.setClass(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
LocalMessageManager.class, MessageManager.class);
- conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS, LocalSyncClient.class,
+ conf.setClass(SyncServiceFactory.SYNC_PEER_CLASS, LocalSyncClient.class,
SyncClient.class);
BSPJob job = new BSPJob(new HamaConfiguration(conf), jobID);
@@ -399,7 +402,7 @@
}
- public static class LocalSyncClient implements SyncClient {
+ public static class LocalSyncClient extends BSPPeerSyncClient {
// note that this is static, because we will have multiple peers
private static CyclicBarrier barrier;
private int tasks;
@@ -461,9 +464,52 @@
}
@Override
- public void close() throws InterruptedException {
+ public void close() {
barrier = null;
}
+
+ @Override
+ public String constructKey(BSPJobID jobId, String... args) {
+ return null;
+ }
+
+ @Override
+ public boolean storeInformation(String key, Writable value,
+ boolean permanent, SyncEventListener listener) {
+ return false;
+ }
+
+ @Override
+ public boolean getInformation(String key, Writable valueHolder) {
+ return false;
+ }
+
+ @Override
+ public boolean addKey(String key, boolean permanent,
+ SyncEventListener listener) {
+ return false;
+ }
+
+ @Override
+ public boolean hasKey(String key) {
+ return false;
+ }
+
+ @Override
+ public String[] getChildKeySet(String key, SyncEventListener listener) {
+ return null;
+ }
+
+ @Override
+ public boolean registerListener(String key, SyncEvent event,
+ SyncEventListener listener) {
+ return false;
+ }
+
+ @Override
+ public boolean remove(String key, SyncEventListener listener) {
+ return false;
+ }
}
@Override
diff --git a/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java b/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
new file mode 100644
index 0000000..bd914a3
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to launch a new task.
+ */
+public class RecoverTaskAction extends GroomServerAction {
+ private Task task;
+ private LongWritable superstepNumber;
+
+ public RecoverTaskAction() {
+ super(ActionType.RECOVER_TASK);
+ superstepNumber = new LongWritable(-1L);
+ }
+
+ public RecoverTaskAction(Task task, long superstep) {
+ super(ActionType.RECOVER_TASK);
+ this.task = task;
+ this.superstepNumber = new LongWritable(superstep);
+ }
+
+ public Task getTask() {
+ return task;
+ }
+
+ public long getSuperstepCount(){
+ return superstepNumber.get();
+ }
+
+ public void write(DataOutput out) throws IOException {
+ task.write(out);
+ superstepNumber.write(out);
+
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ task = new BSPTask();
+ task.readFields(in);
+ superstepNumber.readFields(in);
+ }
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java b/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
index 6124969..0d978d7 100644
--- a/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
+++ b/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
@@ -43,6 +43,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
import org.apache.hama.ipc.GroomProtocol;
import org.apache.hama.monitor.Federator;
import org.apache.hama.monitor.Federator.Act;
@@ -121,6 +122,12 @@
public void jobRemoved(JobInProgress job) throws IOException {
queueManager.get().moveJob(PROCESSING_QUEUE, FINISHED_QUEUE, job);
}
+
+ @Override
+ public void recoverTaskInJob(JobInProgress job) throws IOException {
+ queueManager.get().addJob(WAIT_QUEUE, job);
+ }
+
}
private class JobProcessor extends Thread implements Schedulable {
@@ -221,11 +228,10 @@
throw new NullPointerException("No job is specified.");
}
- @Override
- public Boolean call() {
+ private Boolean scheduleNewTasks() {
// Action to be sent for each task to the respective groom server.
- Map<GroomServerStatus, List<LaunchTaskAction>> actionMap = new HashMap<GroomServerStatus, List<LaunchTaskAction>>(
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap = new HashMap<GroomServerStatus, List<GroomServerAction>>(
2 * this.groomStatuses.size());
Set<Task> taskSet = new HashSet<Task>(2 * jip.tasks.length);
Task t = null;
@@ -240,6 +246,7 @@
// if all tasks could not be scheduled
if (cnt != this.jip.tasks.length) {
+ LOG.error("Could not schedule all tasks!");
return Boolean.FALSE;
}
@@ -248,21 +255,49 @@
while (taskIter.hasNext()) {
Task task = taskIter.next();
GroomServerStatus groomStatus = jip.getGroomStatusForTask(task);
- List<LaunchTaskAction> taskActions = actionMap.get(groomStatus);
+ List<GroomServerAction> taskActions = actionMap.get(groomStatus);
if (taskActions == null) {
- taskActions = new ArrayList<LaunchTaskAction>(
+ taskActions = new ArrayList<GroomServerAction>(
groomStatus.getMaxTasks());
}
taskActions.add(new LaunchTaskAction(task));
actionMap.put(groomStatus, taskActions);
}
+ sendDirectivesToGrooms(actionMap);
+
+ return Boolean.TRUE;
+ }
+
+ /**
+ * Schedule recovery tasks.
+ *
+ * @return TRUE object if scheduling is successful else returns FALSE
+ */
+ private Boolean scheduleRecoveryTasks() {
+
+ // Action to be sent for each task to the respective groom server.
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap = new HashMap<GroomServerStatus, List<GroomServerAction>>(
+ 2 * this.groomStatuses.size());
+
+ try {
+ jip.recoverTasks(groomStatuses, actionMap);
+ } catch (IOException e) {
+ return Boolean.FALSE;
+ }
+ return sendDirectivesToGrooms(actionMap);
+
+ }
+
+ private Boolean sendDirectivesToGrooms(
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap) {
Iterator<GroomServerStatus> groomIter = actionMap.keySet().iterator();
- while (jip.getStatus().getRunState() == JobStatus.RUNNING
+ while ((jip.getStatus().getRunState() == JobStatus.RUNNING || jip
+ .getStatus().getRunState() == JobStatus.RECOVERING)
&& groomIter.hasNext()) {
GroomServerStatus groomStatus = groomIter.next();
- List<LaunchTaskAction> actionList = actionMap.get(groomStatus);
+ List<GroomServerAction> actionList = actionMap.get(groomStatus);
GroomProtocol worker = groomServerManager.get().findGroomServer(
groomStatus);
@@ -276,18 +311,29 @@
LOG.error(
"Fail to dispatch tasks to GroomServer "
+ groomStatus.getGroomName(), ioe);
+ return Boolean.FALSE;
}
}
if (groomIter.hasNext()
- && jip.getStatus().getRunState() != JobStatus.RUNNING) {
+ && (jip.getStatus().getRunState() != JobStatus.RUNNING || jip
+ .getStatus().getRunState() != JobStatus.RECOVERING)) {
LOG.warn("Currently master only shcedules job in running state. "
+ "This may be refined in the future. JobId:" + jip.getJobID());
+ return Boolean.FALSE;
}
return Boolean.TRUE;
}
+
+ public Boolean call() {
+ if (jip.isRecoveryPending()) {
+ return scheduleRecoveryTasks();
+ } else {
+ return scheduleNewTasks();
+ }
+ }
}
/**
@@ -365,8 +411,10 @@
this.jobProcessor.start();
if (null != getConf()
&& getConf().getBoolean("bsp.federator.enabled", false)) {
- this.scheduler.scheduleAtFixedRate(new JvmCollector(federator.get(),
- ((BSPMaster) groomServerManager.get()).zk), 5, 5, SECONDS);
+ this.scheduler.scheduleAtFixedRate(
+ new JvmCollector(federator.get(),
+ ((ZKSyncBSPMasterClient) ((BSPMaster) groomServerManager.get())
+ .getSyncClient()).getZK()), 5, 5, SECONDS);
}
if (null != monitorManager.get()) {
diff --git a/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java b/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
index d9e5211..87293db 100644
--- a/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
+++ b/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
@@ -34,7 +34,7 @@
* TaskInProgress maintains all the info needed for a Task in the lifetime of
* its owning Job.
*/
-class TaskInProgress {
+public class TaskInProgress {
public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
private Configuration conf;
@@ -48,7 +48,6 @@
// Job Meta
private String jobFile = null;
private int partition;
- private BSPMaster bspMaster;
private TaskID id;
private JobInProgress job;
private int completes = 0;
@@ -69,6 +68,8 @@
// The first taskid of this tip
private TaskAttemptID firstTaskId;
+
+ private TaskAttemptID currentTaskId;
// Map from task Id -> GroomServer Id, contains tasks that are
// currently runnings
@@ -84,6 +85,8 @@
private RawSplit rawSplit;
+ private int mySuperstep = -1;
+
/**
* Constructor for new nexus between BSPMaster and GroomServer.
*
@@ -99,12 +102,20 @@
init(jobId);
}
+ /**
+ *
+ * @param jobId
+ * @param jobFile
+ * @param rawSplit
+ * @param conf
+ * @param job
+ * @param partition
+ */
public TaskInProgress(BSPJobID jobId, String jobFile, RawSplit rawSplit,
- BSPMaster master, Configuration conf, JobInProgress job, int partition) {
+ Configuration conf, JobInProgress job, int partition) {
this.jobId = jobId;
this.jobFile = jobFile;
this.rawSplit = rawSplit;
- this.setBspMaster(master);
this.job = job;
this.setConf(conf);
this.partition = partition;
@@ -112,18 +123,178 @@
init(jobId);
}
+ /**
+ *
+ * @param jobId
+ */
private void init(BSPJobID jobId) {
this.id = new TaskID(jobId, partition);
this.startTime = System.currentTimeMillis();
}
/**
- * Return a Task that can be sent to a GroomServer for execution.
+ *
+ * @param taskid
+ * @param grooms
+ * @param tasksInGroomMap
+ * @param possibleLocations
+ * @return
*/
- public Task getTaskToRun(Map<String, GroomServerStatus> grooms,
- Map<GroomServerStatus, Integer> tasksInGroomMap) throws IOException {
- Task t = null;
+ private String getGroomToSchedule(TaskAttemptID taskid,
+ Map<String, GroomServerStatus> grooms,
+ Map<GroomServerStatus, Integer> tasksInGroomMap,
+ String[] possibleLocations) {
+ for (int i = 0; i < possibleLocations.length; ++i) {
+ String location = possibleLocations[i];
+ GroomServerStatus groom = grooms.get(location);
+ if (groom == null)
+ continue;
+ Integer taskInGroom = tasksInGroomMap.get(groom);
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (taskInGroom < groom.getMaxTasks()
+ && location.equals(groom.getGroomHostName())) {
+ return groom.getGroomHostName();
+ }
+ }
+ return null;
+ }
+
+ /**
+ *
+ * @param grooms
+ * @param tasksInGroomMap
+ * @return
+ */
+ private String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
+ Map<GroomServerStatus, Integer> tasksInGroomMap) {
+
+ Iterator<String> groomIter = grooms.keySet().iterator();
+ while (groomIter.hasNext()) {
+ GroomServerStatus groom = grooms.get(groomIter.next());
+ if (groom == null)
+ continue;
+ Integer taskInGroom = tasksInGroomMap.get(groom);
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (taskInGroom < groom.getMaxTasks()) {
+ return groom.getGroomHostName();
+ }
+ }
+ return null;
+ }
+
+ /**
+ *
+ * @param groomStatus
+ * @param grooms
+ * @return
+ */
+ public Task constructTask(GroomServerStatus groomStatus) {
+ if(groomStatus == null){
+ return null;
+ }
+ TaskAttemptID taskId = computeTaskId();
+ if (taskId == null) {
+ return null;
+ } else {
+ String splitClass = null;
+ BytesWritable split = null;
+ currentTaskId = taskId;
+ String groomName = groomStatus.getGroomHostName();
+ Task t = new BSPTask(jobId, jobFile, taskId, partition, splitClass, split);
+ activeTasks.put(taskId, groomName);
+ myGroomStatus = groomStatus;
+ return t;
+ }
+
+ }
+
+ // /* Remove */
+ // private Task getGroomForTask(TaskAttemptID taskid,
+ // Map<String, GroomServerStatus> grooms,
+ // Map<GroomServerStatus, Integer> tasksInGroomMap) {
+ // String splitClass = null;
+ // BytesWritable split = null;
+ // Task t = null;
+ // if (rawSplit != null) {
+ // splitClass = rawSplit.getClassName();
+ // split = rawSplit.getBytes();
+ // String[] possibleLocations = rawSplit.getLocations();
+ // String groomName = getGroomToSchedule(taskid, grooms, tasksInGroomMap,
+ // possibleLocations);
+ // if (groomName != null) {
+ // t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+ // activeTasks.put(taskid, groomName);
+ // myGroomStatus = grooms.get(groomName);
+ // }
+ // }
+ //
+ // if (t == null) {
+ // String groomName = getAnyGroomToSchedule(grooms, tasksInGroomMap);
+ // if (groomName != null) {
+ // t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+ // activeTasks.put(taskid, groomName);
+ // myGroomStatus = grooms.get(groomName);
+ // }
+ // }
+ //
+ // return t;
+ // }
+
+ private Task getGroomForRecoverTaskInHosts(TaskAttemptID taskid,
+ Map<String, GroomServerStatus> grooms,
+ Map<GroomServerStatus, Integer> tasksInGroomMap,
+ String[] possibleLocations) {
+ String splitClass = null;
+ BytesWritable split = null;
+ Task t = null;
+ String groomName = getGroomToSchedule(taskid, grooms, tasksInGroomMap,
+ possibleLocations);
+ if (groomName != null) {
+ t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+ activeTasks.put(taskid, groomName);
+ myGroomStatus = grooms.get(groomName);
+ }
+
+ if (t == null) {
+ groomName = getAnyGroomToSchedule(grooms, tasksInGroomMap);
+ if (groomName != null) {
+ t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+ activeTasks.put(taskid, groomName);
+ myGroomStatus = grooms.get(groomName);
+ }
+ }
+
+ return t;
+ }
+
+ public Task getRecoveryTask(Map<String, GroomServerStatus> grooms,
+ Map<GroomServerStatus, Integer> tasksInGroomMap, String[] hostNames)
+ throws IOException {
+ Integer count = tasksInGroomMap.get(myGroomStatus);
+ if (count != null) {
+ tasksInGroomMap.put(myGroomStatus, count - 1);
+ }
+
+ TaskAttemptID taskId = computeTaskId();
+ LOG.debug("Recovering task = " + String.valueOf(taskId));
+ if (taskId == null) {
+ return null;
+ } else {
+ return getGroomForRecoverTaskInHosts(taskId, grooms, tasksInGroomMap,
+ hostNames);
+ }
+ }
+
+ /**
+ *
+ * @return
+ */
+ public boolean canStartTask() {
+ return (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts));
+ }
+
+ private TaskAttemptID computeTaskId() {
TaskAttemptID taskid = null;
if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART
@@ -135,54 +306,20 @@
+ " attempts for the tip '" + getTIPId() + "'");
return null;
}
-
- String splitClass = null;
- BytesWritable split = null;
- GroomServerStatus selectedGroom = null;
- if (rawSplit != null) {
- splitClass = rawSplit.getClassName();
- split = rawSplit.getBytes();
- String[] possibleLocations = rawSplit.getLocations();
- for (int i = 0; i < possibleLocations.length; ++i) {
- String location = possibleLocations[i];
- GroomServerStatus groom = grooms.get(location);
- if (groom == null) {
- LOG.error("Could not find groom for location: " + location
- + " ; active grooms: " + grooms.keySet());
- continue;
- }
- Integer taskInGroom = tasksInGroomMap.get(groom);
- taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
- if (taskInGroom < groom.getMaxTasks()
- && location.equals(groom.getGroomHostName())) {
- selectedGroom = groom;
- t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
- activeTasks.put(taskid, groom.getGroomName());
-
- break;
- }
- }
- }
- // Failed in attempt to get data locality or there was no input split.
- if (selectedGroom == null) {
- Iterator<String> groomIter = grooms.keySet().iterator();
- while (groomIter.hasNext()) {
- GroomServerStatus groom = grooms.get(groomIter.next());
- Integer taskInGroom = tasksInGroomMap.get(groom);
- taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
- if (taskInGroom < groom.getMaxTasks()) {
- selectedGroom = groom;
- t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
- activeTasks.put(taskid, groom.getGroomName());
- }
- }
- }
-
- myGroomStatus = selectedGroom;
-
- return t;
+ return taskid;
}
+ // /** Remove */
+ // public Task getTaskToRun(Map<String, GroomServerStatus> grooms,
+ // Map<GroomServerStatus, Integer> tasksInGroomMap) throws IOException {
+ // TaskAttemptID taskId = computeTaskId();
+ // if (taskId == null) {
+ // return null;
+ // } else {
+ // return getGroomForTask(taskId, grooms, tasksInGroomMap);
+ // }
+ // }
+
// //////////////////////////////////
// Accessors
// //////////////////////////////////
@@ -344,20 +481,6 @@
}
/**
- * @param bspMaster the bspMaster to set
- */
- public void setBspMaster(BSPMaster bspMaster) {
- this.bspMaster = bspMaster;
- }
-
- /**
- * @return the bspMaster
- */
- public BSPMaster getBspMaster() {
- return bspMaster;
- }
-
- /**
* Set the event number that was raised for this tip
*/
public void setSuccessEventNumber(int eventNumber) {
@@ -382,4 +505,22 @@
return taskStatuses.get(taskid).getGroomServer();
}
+ public int getSuperstep() {
+ return mySuperstep;
+ }
+
+ public void setSuperstep(int mySuperstep) {
+ this.mySuperstep = mySuperstep;
+ }
+
+ // TODO: In future this should be extended to the list of resources that the
+ // task requires.
+ public RawSplit getFileSplit() {
+ return this.rawSplit;
+ }
+
+ public TaskAttemptID getCurrentTaskAttemptId(){
+ return this.currentTaskId;
+ }
+
}
diff --git a/core/src/main/java/org/apache/hama/bsp/TaskRunner.java b/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
index 5a50c58..a059ffc 100644
--- a/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
+++ b/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
@@ -122,8 +122,9 @@
int exit_code = bspProcess.waitFor();
if (!bspKilled && exit_code != 0) {
+
throw new IOException("BSP task process exit with nonzero status of "
- + exit_code + ".");
+ + exit_code + ". command = " + commands);
}
} catch (InterruptedException e) {
LOG.warn("Thread is interrupted when execeuting BSP process.", e);
@@ -223,6 +224,17 @@
vargs.add(Integer.toString(addr.getPort()));
vargs.add(task.getTaskID().toString());
vargs.add(groomServer.groomHostName);
+ vargs.add(Long.toString(groomServer.getStartSuperstep(task.getTaskID())));
+ TaskStatus status = groomServer.getTaskStatus(task.getTaskID());
+
+ if(status != null &&
+ TaskStatus.State.RECOVERING.equals(status.getRunState())){
+ vargs.add(TaskStatus.State.RECOVERING.name());
+ }
+ else{
+ vargs.add(TaskStatus.State.RUNNING.name());
+ }
+
}
return vargs;
}
@@ -285,6 +297,7 @@
if (bspProcess != null) {
bspProcess.destroy();
}
+
}
/**
diff --git a/core/src/main/java/org/apache/hama/bsp/TaskStatus.java b/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
index 8665b24..131e174 100644
--- a/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
+++ b/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
@@ -37,13 +37,14 @@
// enumeration for reporting current phase of a task.
public static enum Phase {
- STARTING, COMPUTE, BARRIER_SYNC, CLEANUP
+ STARTING, COMPUTE, BARRIER_SYNC, CLEANUP, RECOVERING
}
// what state is the task in?
public static enum State {
RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, COMMIT_PENDING,
- FAILED_UNCLEAN, KILLED_UNCLEAN
+ FAILED_UNCLEAN, KILLED_UNCLEAN, FAULT_NOTIFIED, RECOVERY_SCHEDULING,
+ RECOVERY_SCHEDULED, RECOVERING
}
private BSPJobID jobId;
diff --git a/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java b/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java
new file mode 100644
index 0000000..f2b63bf
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster}
+ * to the {@link org.apache.hama.bsp.GroomServer} to kill a task.
+ */
+class UpdatePeerAction extends GroomServerAction {
+ TaskAttemptID taskId;
+ TaskAttemptID peerTaskId;
+ Text groomName;
+
+ public UpdatePeerAction() {
+ super(ActionType.UPDATE_PEER);
+ taskId = new TaskAttemptID();
+ groomName = new Text("");
+ }
+
+ public UpdatePeerAction(TaskAttemptID taskId, TaskAttemptID peerTaskId,
+ String groom) {
+ super(ActionType.UPDATE_PEER);
+ this.taskId = taskId;
+ this.peerTaskId = peerTaskId;
+ this.groomName = new Text(groom);
+ }
+
+ public TaskAttemptID getTaskID() {
+ return taskId;
+ }
+
+ public TaskAttemptID getPeerTaskID(){
+ return peerTaskId;
+ }
+
+ public String getGroomName(){
+ return groomName.toString();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ taskId.write(out);
+ peerTaskId.write(out);
+ groomName.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ taskId.readFields(in);
+ peerTaskId.readFields(in);
+ groomName.readFields(in);
+ }
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java b/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
new file mode 100644
index 0000000..dd79d72
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
@@ -0,0 +1,544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.GroomServerAction;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.JobInProgress;
+import org.apache.hama.bsp.RecoverTaskAction;
+import org.apache.hama.bsp.Task;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskID;
+import org.apache.hama.bsp.TaskInProgress;
+import org.apache.hama.bsp.TaskStatus;
+import org.apache.hama.bsp.message.MessageEventListener;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.sync.MasterSyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+/**
+ * <code>AsyncRcvdMsgCheckpointImpl</code> Checkpoint service defines the fault
+ * tolerance strategy by checkpointing of messages sent across peers. On
+ * failure, all the tasks are restarted from the last superstep for which all
+ * the peers successfully checkpointed the messages.
+ *
+ */
+public class AsyncRcvdMsgCheckpointImpl<M extends Writable> implements
+ BSPFaultTolerantService<M> {
+
+ private static final Log LOG = LogFactory
+ .getLog(AsyncRcvdMsgCheckpointImpl.class);
+
+ /**
+ * It is responsible to find the smallest superstep for which the
+ * checkpointing is done and then restart all the peers from that superstep.
+ */
+ private static class CheckpointMasterService implements
+ FaultTolerantMasterService {
+
+ private Configuration conf;
+ private TaskInProgress tasks[];
+ private BSPJobID jobId;
+ private int maxTaskAttempts;
+ private int currentAttemptId;
+ private MasterSyncClient masterSyncClient;
+ private TaskAllocationStrategy allocationStrategy;
+
+ /**
+ * Initializes the fault tolerance service at BSPMasters
+ *
+ * @param jobId The identifier of the job.
+ * @param maxTaskAttempts Number of attempts allowed for recovering from
+ * failure.
+ * @param tasks The list of tasks in the job.
+ * @param conf The job configuration object.
+ * @param masterClient The synchronization client used by BSPMaster.
+ * @param allocationStrategy The task allocation strategy of the job.
+ */
+ public void initialize(BSPJobID jobId, int maxTaskAttempts,
+ TaskInProgress[] tasks, Configuration conf,
+ MasterSyncClient masterClient, TaskAllocationStrategy allocationStrategy) {
+ this.tasks = tasks;
+ this.jobId = jobId;
+ this.conf = conf;
+ this.maxTaskAttempts = maxTaskAttempts;
+ this.currentAttemptId = 0;
+ this.masterSyncClient = masterClient;
+ this.allocationStrategy = allocationStrategy;
+ }
+
+ @Override
+ public boolean isRecoveryPossible(TaskInProgress tip) {
+ return currentAttemptId < maxTaskAttempts;
+ }
+
+ @Override
+ public boolean isAlreadyRecovered(TaskInProgress tip) {
+ return currentAttemptId < tip.getCurrentTaskAttemptId().getId();
+ }
+
+ @Override
+ public void recoverTasks(JobInProgress jip,
+ Map<String, GroomServerStatus> groomStatuses,
+ TaskInProgress[] failedTasksInProgress,
+ TaskInProgress[] allTasksInProgress,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+ throws IOException {
+
+ Map<TaskID, TaskInProgress> recoverySet = new HashMap<TaskID, TaskInProgress>(
+ 2 * failedTasksInProgress.length);
+ for (int i = 0; i < failedTasksInProgress.length; ++i) {
+ recoverySet.put(failedTasksInProgress[i].getTaskId(),
+ failedTasksInProgress[i]);
+ }
+
+ long lowestSuperstepNumber = Long.MAX_VALUE;
+
+ String[] taskProgress = this.masterSyncClient.getChildKeySet(
+ this.masterSyncClient.constructKey(jobId, "checkpoint"), null);
+
+ if (LOG.isDebugEnabled()) {
+ StringBuffer list = new StringBuffer(25 * taskProgress.length);
+ list.append("got child key set").append(taskProgress.length)
+ .append("/").append(tasks.length).append(" ");
+ for (String entry : taskProgress) {
+ list.append(entry).append(",");
+ }
+ LOG.debug(list);
+ }
+
+ if (taskProgress.length == this.tasks.length) {
+ for (int i = 0; i < taskProgress.length; ++i) {
+ ArrayWritable progressInformation = new ArrayWritable(
+ LongWritable.class);
+ boolean result = this.masterSyncClient.getInformation(
+ this.masterSyncClient.constructKey(jobId, "checkpoint",
+ taskProgress[i]), progressInformation);
+
+ if (!result) {
+ lowestSuperstepNumber = -1L;
+ break;
+ }
+
+ Writable[] progressArr = progressInformation.get();
+ LongWritable superstepProgress = (LongWritable) progressArr[0];
+
+ if (superstepProgress != null) {
+ if (superstepProgress.get() < lowestSuperstepNumber) {
+ lowestSuperstepNumber = superstepProgress.get();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got superstep number " + lowestSuperstepNumber
+ + " from " + taskProgress[i]);
+ }
+ }
+ }
+ }
+ clearClientForSuperstep(lowestSuperstepNumber);
+ restartJob(lowestSuperstepNumber, groomStatuses, recoverySet,
+ allTasksInProgress, taskCountInGroomMap, actionMap);
+
+ } else {
+ restartJob(-1, groomStatuses, recoverySet, allTasksInProgress,
+ taskCountInGroomMap, actionMap);
+ }
+
+ ++currentAttemptId;
+ }
+
+ private void clearClientForSuperstep(long superstep) {
+ this.masterSyncClient.remove(
+ masterSyncClient.constructKey(jobId, "sync"), null);
+ }
+
+ private void populateAction(Task task, long superstep,
+ GroomServerStatus groomStatus,
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap) {
+ List<GroomServerAction> list = actionMap.get(groomStatus);
+ if (!actionMap.containsKey(groomStatus)) {
+ list = new ArrayList<GroomServerAction>();
+ actionMap.put(groomStatus, list);
+ }
+ list.add(new RecoverTaskAction(task, superstep));
+
+ }
+
+ private void restartTask(TaskInProgress tip, long superstep,
+ Map<String, GroomServerStatus> groomStatuses,
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap) {
+ GroomServerStatus serverStatus = tip.getGroomServerStatus();
+ Task task = tip.constructTask(serverStatus);
+ populateAction(task, superstep, serverStatus, actionMap);
+
+ }
+
+ private void restartJob(long superstep,
+ Map<String, GroomServerStatus> groomStatuses,
+ Map<TaskID, TaskInProgress> recoveryMap, TaskInProgress[] allTasks,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+ throws IOException {
+ String path = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
+
+ if (superstep >= 0) {
+ FileSystem fileSystem = FileSystem.get(conf);
+ for (int i = 0; i < allTasks.length; ++i) {
+ String[] hosts = null;
+ if (recoveryMap.containsKey(allTasks[i].getTaskId())) {
+
+ // Update task count in map.
+ // TODO: This should be a responsibility of GroomServerStatus
+ Integer count = taskCountInGroomMap.get(allTasks[i]
+ .getGroomServerStatus());
+ if (count != null) {
+ count = count.intValue() - 1;
+ taskCountInGroomMap
+ .put(allTasks[i].getGroomServerStatus(), count);
+ }
+
+ StringBuffer ckptPath = new StringBuffer(path);
+ ckptPath.append(this.jobId.toString());
+ ckptPath.append("/").append(superstep).append("/")
+ .append(allTasks[i].getTaskId().getId());
+ Path checkpointPath = new Path(ckptPath.toString());
+ if (fileSystem.exists(checkpointPath)) {
+ FileStatus fileStatus = fileSystem.getFileStatus(checkpointPath);
+ BlockLocation[] blocks = fileSystem.getFileBlockLocations(
+ fileStatus, 0, fileStatus.getLen());
+ hosts = blocks[0].getHosts();
+ } else {
+ hosts = new String[groomStatuses.keySet().size()];
+ groomStatuses.keySet().toArray(hosts);
+ }
+ GroomServerStatus serverStatus = this.allocationStrategy
+ .getGroomToAllocate(groomStatuses, hosts, taskCountInGroomMap,
+ new BSPResource[0], allTasks[i]);
+ Task task = allTasks[i].constructTask(serverStatus);
+ populateAction(task, superstep, serverStatus, actionMap);
+
+ } else {
+ restartTask(allTasks[i], superstep, groomStatuses, actionMap);
+ }
+ }
+ } else {
+ // Start the task from the beginning.
+ for (int i = 0; i < allTasks.length; ++i) {
+ if (recoveryMap.containsKey(allTasks[i].getTaskId())) {
+ this.allocationStrategy.getGroomToAllocate(groomStatuses,
+ this.allocationStrategy.selectGrooms(groomStatuses,
+ taskCountInGroomMap, new BSPResource[0], allTasks[i]),
+ taskCountInGroomMap, new BSPResource[0], allTasks[i]);
+ } else {
+ restartTask(allTasks[i], superstep, groomStatuses, actionMap);
+ }
+ }
+ }
+ }
+
+ }// end of CheckpointMasterService
+
+ @Override
+ public FaultTolerantPeerService<M> constructPeerFaultTolerance(BSPJob job,
+ @SuppressWarnings("rawtypes")
+ BSPPeer bspPeer, PeerSyncClient syncClient,
+ InetSocketAddress peerAddress, TaskAttemptID taskAttemptId,
+ long superstep, Configuration conf, MessageManager<M> messenger)
+ throws Exception {
+ CheckpointPeerService<M> service = new CheckpointPeerService<M>();
+ service.initialize(job, bspPeer, syncClient, peerAddress, taskAttemptId,
+ superstep, conf, messenger);
+ return service;
+ }
+
+ @Override
+ public FaultTolerantMasterService constructMasterFaultTolerance(
+ BSPJobID jobId, int maxTaskAttempts, TaskInProgress[] tasks,
+ Configuration conf, MasterSyncClient masterClient,
+ TaskAllocationStrategy allocationStrategy) throws Exception {
+ CheckpointMasterService service = new CheckpointMasterService();
+ service.initialize(jobId, maxTaskAttempts, tasks, conf, masterClient,
+ allocationStrategy);
+ return service;
+ }
+
+ /**
+ * Initializes the peer fault tolerance by checkpointing service. For
+ * recovery, on peer initialization, it reads all the checkpointed messages to
+ * recover the state of the peer. During normal working, it checkpoints all
+ * the messages it received in the previous superstep. It also stores the
+ * superstep progress in the global synchronization area.
+ *
+ */
+ public static class CheckpointPeerService<M extends Writable> implements
+ FaultTolerantPeerService<M>, MessageEventListener<M> {
+
+ private BSPJob job;
+ @SuppressWarnings("rawtypes")
+ private BSPPeer peer;
+ private PeerSyncClient syncClient;
+ private long superstep;
+ private Configuration conf;
+ private MessageManager<M> messenger;
+ private FileSystem fs;
+ private int checkPointInterval;
+ volatile private long lastCheckPointStep;
+ volatile private boolean checkpointState;
+ volatile private FSDataOutputStream checkpointStream;
+ volatile private long checkpointMessageCount;
+
+ public void initialize(BSPJob job, @SuppressWarnings("rawtypes")
+ BSPPeer bspPeer, PeerSyncClient syncClient, InetSocketAddress peerAddress,
+ TaskAttemptID taskAttemptId, long superstep, Configuration conf,
+ MessageManager<M> messenger) throws IOException {
+
+ this.job = job;
+ this.peer = bspPeer;
+ this.syncClient = syncClient;
+ this.superstep = superstep;
+ this.conf = conf;
+ this.messenger = messenger;
+ this.fs = FileSystem.get(conf);
+ this.checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL,
+ Constants.DEFAULT_CHECKPOINT_INTERVAL);
+ this.checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL,
+ Constants.DEFAULT_CHECKPOINT_INTERVAL);
+
+ this.checkpointState = conf.getBoolean(Constants.CHECKPOINT_ENABLED,
+ false);
+
+ if (superstep > 0) {
+ this.lastCheckPointStep = this.superstep;
+ } else {
+ this.lastCheckPointStep = 1;
+ }
+ this.checkpointMessageCount = 0L;
+ }
+
+ private String checkpointPath(long step) {
+ String backup = conf.get("bsp.checkpoint.prefix_path", "checkpoint/");
+ String ckptPath = backup + job.getJobID().toString() + "/" + (step) + "/"
+ + peer.getPeerIndex();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Received Messages are to be saved to " + ckptPath);
+ return ckptPath;
+ }
+
+ @Override
+ public TaskStatus.State onPeerInitialized(TaskStatus.State state)
+ throws Exception {
+ if (this.superstep >= 0 && state.equals(TaskStatus.State.RECOVERING)) {
+ ArrayWritable progressArr = new ArrayWritable(LongWritable.class);
+ boolean result = this.syncClient.getInformation(
+ this.syncClient.constructKey(job.getJobID(), "checkpoint",
+ String.valueOf(peer.getPeerIndex())), progressArr);
+
+ if (!result) {
+ throw new IOException("No data found to restore peer state.");
+ }
+
+ Writable[] progressInfo = progressArr.get();
+ long superstepProgress = ((LongWritable) progressInfo[0]).get();
+ long numMessages = ((LongWritable) progressInfo[1]).get();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got sstep =" + superstepProgress + " numMessages = "
+ + numMessages + " this.superstep = " + this.superstep);
+ }
+
+ if (numMessages > 0) {
+ Path path = new Path(checkpointPath(superstepProgress));
+ FSDataInputStream in = this.fs.open(path);
+ BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+ try {
+ for (int i = 0; i < numMessages; ++i) {
+ String className = in.readUTF();
+ @SuppressWarnings("unchecked")
+ M message = (M) ReflectionUtils.newInstance(
+ Class.forName(className), conf);
+ message.readFields(in);
+ bundle.addMessage(message);
+ }
+ messenger.loopBackMessages(bundle);
+ } catch (EOFException e) {
+ LOG.error("Error recovering from checkpointing", e);
+ throw new IOException(e);
+ } finally {
+ this.fs.close();
+ }
+ }
+ }
+ this.messenger.registerListener(this);
+ return TaskStatus.State.RUNNING;
+
+ }
+
+ public final boolean isReadyToCheckpoint() {
+
+ checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL, 1);
+ LOG.info(new StringBuffer(1000).append("Enabled = ")
+ .append(conf.getBoolean(Constants.CHECKPOINT_ENABLED, false))
+ .append(" checkPointInterval = ").append(checkPointInterval)
+ .append(" lastCheckPointStep = ").append(lastCheckPointStep)
+ .append(" getSuperstepCount() = ").append(peer.getSuperstepCount())
+ .toString());
+ if (LOG.isDebugEnabled())
+ LOG.debug(new StringBuffer(1000).append("Enabled = ")
+ .append(conf.getBoolean(Constants.CHECKPOINT_ENABLED, false))
+ .append(" checkPointInterval = ").append(checkPointInterval)
+ .append(" lastCheckPointStep = ").append(lastCheckPointStep)
+ .append(" getSuperstepCount() = ").append(peer.getSuperstepCount())
+ .toString());
+
+ return (conf.getBoolean(Constants.CHECKPOINT_ENABLED, false)
+ && (checkPointInterval != 0) && (((int) ((peer.getSuperstepCount() + 1) - lastCheckPointStep)) >= checkPointInterval));
+
+ }
+
+ @Override
+ public void beforeBarrier() throws Exception {
+ }
+
+ @Override
+ public void duringBarrier() throws Exception {
+ }
+
+ @Override
+ public void afterBarrier() throws Exception {
+
+ synchronized (this) {
+ if (checkpointState) {
+
+ if (checkpointStream != null) {
+ this.checkpointStream.close();
+ this.checkpointStream = null;
+ }
+
+ lastCheckPointStep = peer.getSuperstepCount();
+
+ ArrayWritable writableArray = new ArrayWritable(LongWritable.class);
+ Writable[] writeArr = new Writable[2];
+ writeArr[0] = new LongWritable(lastCheckPointStep);
+ writeArr[1] = new LongWritable(checkpointMessageCount);
+ writableArray.set(writeArr);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing lastCheckPointStep = " + lastCheckPointStep
+ + " checkpointMessageCount = " + checkpointMessageCount
+ + " for peer = " + String.valueOf(peer.getPeerIndex()));
+ }
+
+ this.syncClient.storeInformation(this.syncClient.constructKey(
+ this.job.getJobID(), "checkpoint",
+ String.valueOf(peer.getPeerIndex())), writableArray, true, null);
+ }
+ checkpointState = isReadyToCheckpoint();
+ checkpointMessageCount = 0;
+ }
+
+ LOG.info("checkpoingNext = " + checkpointState
+ + " checkpointMessageCount = " + checkpointMessageCount);
+ }
+
+ @Override
+ public void onInitialized() {
+
+ }
+
+ @Override
+ public void onMessageSent(String peerName, M message) {
+ }
+
+ @Override
+ public void onMessageReceived(M message) {
+ String checkpointedPath = null;
+
+ if (message == null) {
+ LOG.error("Message M is found to be null");
+ }
+
+ synchronized (this) {
+ if (checkpointState) {
+ if (this.checkpointStream == null) {
+ checkpointedPath = checkpointPath(peer.getSuperstepCount() + 1);
+ try {
+ LOG.info("Creating path " + checkpointedPath);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating path " + checkpointedPath);
+ }
+ checkpointStream = this.fs.create(new Path(checkpointedPath));
+ } catch (IOException ioe) {
+ LOG.error("Fail checkpointing messages to " + checkpointedPath,
+ ioe);
+ throw new RuntimeException("Failed opening HDFS file "
+ + checkpointedPath, ioe);
+ }
+ }
+ try {
+ ++checkpointMessageCount;
+ checkpointStream.writeUTF(message.getClass().getCanonicalName());
+ message.write(checkpointStream);
+ } catch (IOException ioe) {
+ LOG.error("Fail checkpointing messages to " + checkpointedPath, ioe);
+ throw new RuntimeException("Failed writing to HDFS file "
+ + checkpointedPath, ioe);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("message count = " + checkpointMessageCount);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void onClose() {
+
+ }
+
+ }
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java b/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java
new file mode 100644
index 0000000..967dcd3
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskInProgress;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.sync.MasterSyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+/**
+ * <code>BSPFaultTolerantService</code> defines the fault tolerance service
+ * behavior. The fault tolerance service is a feature of a running job and not
+ * the system. A class defined on this behavior has the responsibility to create
+ * two objects. The first object <code>FaultTolerantMasterService</code> is
+ * used by the job at BSPMaster to handle fault tolerance related steps at the
+ * master. The second object <code>FaultTolerantPeerService</code> is used to
+ * define the behavior of object that would implement the fault tolerance
+ * related steps for recovery inside <code>BSPPeer</code> (in each of the BSP
+ * peers doing computations)
+ */
+public interface BSPFaultTolerantService<M extends Writable> {
+
+ /**
+ * The token by which a job can register its fault-tolerance service.
+ */
+ public static final String FT_SERVICE_CONF = "hama.ft.conf.class";
+
+ /**
+ * Creates the instance of <code>FaultTolerantMasterService</code> that would
+ * handle fault-tolerance related steps at BSPMaster task scheduler.
+ *
+ * @param jobId The identifier of the job.
+ * @param maxTaskAttempts Number of attempts allowed for recovering from
+ * failure.
+ * @param tasks The list of tasks in the job.
+ * @param conf The job configuration object.
+ * @param masterClient The synchronization client used by BSPMaster.
+ * @param allocationStrategy The task allocation strategy of the job.
+ * @return An instance of class inheriting
+ * <code>FaultTolerantMasterService</code>
+ */
+ public FaultTolerantMasterService constructMasterFaultTolerance(
+ BSPJobID jobId, int maxTaskAttempts, TaskInProgress[] tasks,
+ Configuration conf, MasterSyncClient masterClient,
+ TaskAllocationStrategy allocationStrategy) throws Exception;
+
+ /**
+ * Creates an instance of <code>FaultTolerantPeerService</code> which defines
+ * the steps that has to be taken inside a peer for fault-tolerance.
+ *
+ * @param bspPeer The peer
+ * @param syncClient The synchronization client used by peer.
+ * @param superstep The superstep from which the peer is initialized.
+ * @param conf job configuration object
+ * @param messenger The messaging system between the peers
+ * @return An instance of class inheriting
+ * <code>FaultTolerantPeerService</code>
+ */
+ public FaultTolerantPeerService<M> constructPeerFaultTolerance(BSPJob job,
+ @SuppressWarnings("rawtypes")
+ BSPPeer bspPeer, PeerSyncClient syncClient,
+ InetSocketAddress peerAddress, TaskAttemptID taskAttemptId,
+ long superstep, Configuration conf, MessageManager<M> messenger)
+ throws Exception;
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java b/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java
new file mode 100644
index 0000000..43bfd60
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hama.bsp.GroomServerAction;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.JobInProgress;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>FaultTolerantMasterService</code> defines the behavior of object
+ * responsible for doing fault-tolerance related work on BSPMaster task
+ * scheduler. This is defined per job.
+ */
+public interface FaultTolerantMasterService {
+
+ /**
+ * Returns true if recovery of the task in question is possible.
+ *
+ * @param tip <code>TaskInProgress</code> object that represents the task.
+ * @return true if recovery is possible.
+ */
+ public boolean isRecoveryPossible(TaskInProgress tip);
+
+ /**
+ * Returns true if the task is already slated to be recovered for failure.
+ *
+ * @param tip <code>TaskInProgress</code> object that represents the task.
+ * @return if task/job is already in process of recovery.
+ */
+ public boolean isAlreadyRecovered(TaskInProgress tip);
+
+ /**
+ * From the list of tasks that are failed, provide the task scheduler a set of
+ * actions and the grooms to which these actions must be sent for fault
+ * recovery.
+ *
+ * @param jip The job in question which has to be recovered.
+ * @param groomStatuses The map of grooms to their statuses.
+ * @param failedTasksInProgress The list of failed tasks.
+ * @param allTasksInProgress This list of all tasks in the job.
+ * @param actionMap The map of groom to the list of actions that are to be
+ * taken on that groom.
+ */
+ public void recoverTasks(JobInProgress jip,
+ Map<String, GroomServerStatus> groomStatuses,
+ TaskInProgress[] failedTasksInProgress,
+ TaskInProgress[] allTasksInProgress,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+ throws IOException;
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java b/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java
new file mode 100644
index 0000000..3a34d73
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.TaskStatus;
+
+/**
+ * <code>FaultTolerantPeerService</code> defines the steps required to be
+ * performed by peers for fault-tolerance. At different stages of peer
+ * execution, the service can take necessary measures to ensure that the peer
+ * computations could be recovered if any of them failed.
+ */
+public interface FaultTolerantPeerService<M extends Writable> {
+
+ /**
+ * This is called once the peer is initialized.
+ *
+ * @throws Exception
+ */
+ public TaskStatus.State onPeerInitialized(TaskStatus.State state)
+ throws Exception;
+
+ /**
+ * This function is called before all the peers go into global sync/
+ *
+ * @throws Exception
+ */
+ public void beforeBarrier() throws Exception;
+
+ /**
+ * This functions is called after the peers enter the barrier but before they
+ * initate leaving the barrier.
+ *
+ * @throws Exception
+ */
+ public void duringBarrier() throws Exception;
+
+ /**
+ * This function is called every time the peer completes the global
+ * synchronization.
+ *
+ * @throws Exception
+ */
+ public void afterBarrier() throws Exception;
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java b/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
index bb96efe..b0894cb 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
@@ -22,7 +22,9 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map.Entry;
+import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,6 +33,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
@@ -61,6 +64,9 @@
// the task attempt id
protected TaskAttemptID attemptId;
+ // List of listeners for all the sent messages
+ protected Queue<MessageEventListener<M>> messageListenerQueue;
+
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.message.MessageManager#init(org.apache.hama.bsp.
@@ -70,12 +76,14 @@
@Override
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
Configuration conf, InetSocketAddress peerAddress) {
+ this.messageListenerQueue = new LinkedList<MessageEventListener<M>>();
this.attemptId = attemptId;
this.peer = peer;
this.conf = conf;
this.peerAddress = peerAddress;
localQueue = getQueue();
localQueueForNextIteration = getSynchronizedQueue();
+
}
/*
@@ -84,18 +92,22 @@
*/
@Override
public void close() {
- Collection<MessageQueue<M>> values = outgoingQueues.values();
- for (MessageQueue<M> msgQueue : values) {
- msgQueue.close();
- }
- localQueue.close();
- // remove possible disk queues from the path
try {
- FileSystem.get(conf).delete(
- DiskQueue.getQueueDir(conf, attemptId,
- conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true);
- } catch (IOException e) {
- LOG.warn("Queue dir couldn't be deleted");
+ Collection<MessageQueue<M>> values = outgoingQueues.values();
+ for (MessageQueue<M> msgQueue : values) {
+ msgQueue.close();
+ }
+ localQueue.close();
+ // remove possible disk queues from the path
+ try {
+ FileSystem.get(conf).delete(
+ DiskQueue.getQueueDir(conf, attemptId,
+ conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true);
+ } catch (IOException e) {
+ LOG.warn("Queue dir couldn't be deleted");
+ }
+ } finally {
+ notifyClose();
}
}
@@ -139,6 +151,7 @@
localQueue = localQueueForNextIteration.getMessageQueue();
localQueue.prepareRead();
localQueueForNextIteration = getSynchronizedQueue();
+ notifyInit();
}
/*
@@ -163,6 +176,7 @@
queue.add(msg);
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
outgoingQueues.put(targetPeerAddress, queue);
+ notifySentMessage(peerName, msg);
}
/*
@@ -206,4 +220,68 @@
this.conf = conf;
}
+ private void notifySentMessage(String peerName, M message) {
+ Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+ .iterator();
+ while (iterator.hasNext()) {
+ iterator.next().onMessageSent(peerName, message);
+ }
+ }
+
+ private void notifyReceivedMessage(M message) throws IOException {
+ Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+ .iterator();
+ while (iterator.hasNext()) {
+ iterator.next().onMessageReceived(message);
+ }
+ }
+
+ private void notifyInit() {
+ Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+ .iterator();
+ while (iterator.hasNext()) {
+ iterator.next().onInitialized();
+ }
+ }
+
+ private void notifyClose() {
+ Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+ .iterator();
+ while (iterator.hasNext()) {
+ iterator.next().onClose();
+ }
+ }
+
+
+
+ @Override
+ public void registerListener(MessageEventListener<M> listener)
+ throws IOException {
+ if(listener != null)
+ this.messageListenerQueue.add(listener);
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) throws IOException{
+ for (Writable message : bundle.getMessages()) {
+ loopBackMessage((M)message);
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void loopBackMessage(Writable message) throws IOException{
+ this.localQueueForNextIteration.add((M)message);
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
+ notifyReceivedMessage((M)message);
+
+ }
+
+
+
+
+
}
diff --git a/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java b/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
index 43103d7..08d4ddb 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
@@ -25,7 +25,6 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
-import java.util.Iterator;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.NettyServer;
@@ -61,14 +60,8 @@
server.close();
}
- public void put(BSPMessageBundle<M> messages) {
- peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
- messages.getMessages().size());
- Iterator<M> iterator = messages.getMessages().iterator();
- while (iterator.hasNext()) {
- this.localQueueForNextIteration.add(iterator.next());
- iterator.remove();
- }
+ public void put(BSPMessageBundle<M> messages) throws IOException {
+ this.loopBackMessages(messages);
}
@SuppressWarnings("unchecked")
@@ -139,5 +132,4 @@
return ByteBuffer.wrap(data);
}
}
-
}
diff --git a/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java b/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
index 48ff60d..8adc605 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
@@ -17,6 +17,8 @@
*/
package org.apache.hama.bsp.message;
+import java.io.IOException;
+
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
@@ -35,7 +37,7 @@
*
* @param msg
*/
- public void put(M msg);
+ public void put(M msg) throws IOException;
/**
* This method puts a messagebundle for the next iteration. Accessed
@@ -43,7 +45,7 @@
*
* @param messages
*/
- public void put(BSPMessageBundle<M> messages);
+ public void put(BSPMessageBundle<M> messages) throws IOException;
/**
* This method puts a compressed message bundle for the next iteration.
@@ -51,6 +53,6 @@
*
* @param compMsgBundle
*/
- public void put(BSPCompressedBundle compMsgBundle);
+ public void put(BSPCompressedBundle compMsgBundle) throws IOException;
}
diff --git a/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java b/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
index 20aa2e4..259a51d 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
@@ -29,7 +29,6 @@
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
@@ -115,24 +114,19 @@
}
@Override
- public final void put(M msg) {
- this.localQueueForNextIteration.add(msg);
- peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
+ public final void put(M msg) throws IOException {
+ loopBackMessage(msg);
}
@Override
- public final void put(BSPMessageBundle<M> messages) {
- for (M message : messages.getMessages()) {
- this.localQueueForNextIteration.add(message);
- }
+ public final void put(BSPMessageBundle<M> messages) throws IOException {
+ loopBackMessages(messages);
}
@Override
- public final void put(BSPCompressedBundle compMsgBundle) {
+ public final void put(BSPCompressedBundle compMsgBundle) throws IOException {
BSPMessageBundle<M> bundle = compressor.decompressBundle(compMsgBundle);
- for (M message : bundle.getMessages()) {
- this.localQueueForNextIteration.add(message);
- }
+ loopBackMessages(bundle);
}
@Override
@@ -141,4 +135,5 @@
return versionID;
}
+
}
diff --git a/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java b/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java
new file mode 100644
index 0000000..204b6ab
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+public interface MessageEventListener<M> {
+
+ /**
+ *
+ *
+ */
+ public static enum MessageManagerEvent {
+ INITIALIZED, MESSAGE_SENT, MESSAGE_RECEIVED, CLOSE
+ }
+
+ /**
+ * The function to handle the event when the queue is initialized.
+ */
+ void onInitialized();
+
+ /**
+ * The function to handle the event when a message is sent.
+ * <code>message</code> should not be modified.
+ *
+ * @param peerName Name of the peer to be sent.
+ * @param message The message set.
+ */
+ void onMessageSent(String peerName, final M message);
+
+ /**
+ * The function to handle the event when a message is received.
+ * <code>message</code> should not be modified.
+ *
+ * @param message The message received.
+ */
+ void onMessageReceived(final M message);
+
+ /**
+ * The function to handle the event when the queue is closed.
+ */
+ void onClose();
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java b/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
index 6662a52..c15b36d 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
@@ -34,7 +34,7 @@
*
*/
public interface MessageManager<M extends Writable> {
-
+
public static final String QUEUE_TYPE_CLASS = "hama.messenger.queue.class";
/**
@@ -96,4 +96,25 @@
*/
public int getNumCurrentMessages();
+ /**
+ * Send the messages to self to receive in the next superstep.
+ */
+ public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) throws IOException;
+
+ /**
+ * Send the message to self to receive in the next superstep.
+ */
+ public void loopBackMessage(Writable message) throws IOException;
+
+ /**
+ * Register a listener for the events in message manager.
+ *
+ * @param listener <code>MessageEventListener</code> object that processes the
+ * messages sent to remote peer.
+ * @throws IOException
+ */
+ public void registerListener(MessageEventListener<M> listener)
+ throws IOException;
+
+
}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java b/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java
new file mode 100644
index 0000000..28b4c3d
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hama.HamaConfiguration;
+
+public abstract class BSPMasterSyncClient implements MasterSyncClient{
+
+ /**
+ * Initialize the Synchronization client.
+ *
+ * @param conf The configuration parameters to initialize the client.
+ */
+ public abstract void init(HamaConfiguration conf);
+
+ /**
+ * Clears all information stored.
+ */
+ public abstract void clear();
+
+ /**
+ * Register a newly added job
+ * @param string
+ */
+ public abstract void registerJob(String string);
+
+ /**
+ * Deregister the job from the system.
+ * @param string
+ */
+ public abstract void deregisterJob(String string);
+
+ /**
+ * Closes the client.
+ */
+ public abstract void close();
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java b/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
new file mode 100644
index 0000000..e391e11
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+
+public abstract class BSPPeerSyncClient implements PeerSyncClient{
+
+ /**
+ * Init will be called within a spawned task, it should be used to initialize
+ * the inner structure and fields, e.G. a zookeeper client or an rpc
+ * connection to the real sync daemon.
+ *
+ * @throws Exception
+ */
+ public abstract void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+ throws Exception;
+
+ /**
+ * Enters the barrier before the message sending in each superstep.
+ *
+ * @param jobId the jobs ID
+ * @param taskId the tasks ID
+ * @param superstep the superstep of the task
+ * @throws SyncException
+ */
+ public abstract void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+ throws SyncException;
+
+ /**
+ * Leaves the barrier after all communication has been done, this is usually
+ * the end of a superstep.
+ *
+ * @param jobId the jobs ID
+ * @param taskId the tasks ID
+ * @param superstep the superstep of the task
+ * @throws SyncException
+ */
+ public abstract void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+ throws SyncException;
+
+ /**
+ * Registers a specific task with a its host and port to the sync daemon.
+ *
+ * @param jobId the jobs ID
+ * @param taskId the tasks ID
+ * @param hostAddress the host where the sync server resides
+ * @param port the port where the sync server is up
+ */
+ public abstract void register(BSPJobID jobId, TaskAttemptID taskId,
+ String hostAddress, long port);
+
+ /**
+ * Returns all registered tasks within the sync daemon. They have to be
+ * ordered ascending by their task id.
+ *
+ * @param taskId the tasks ID
+ * @return an <b>ordered</b> string array of host:port pairs of all tasks
+ * connected to the daemon.
+ */
+ public abstract String[] getAllPeerNames(TaskAttemptID taskId);
+
+ /**
+ * TODO this has currently no use. Could later be used to deregister tasks
+ * from the barrier during runtime if they are finished. Something equal to
+ * voteToHalt() in Pregel.
+ *
+ * @param jobId
+ * @param taskId
+ * @param hostAddress
+ * @param port
+ */
+ public abstract void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+ String hostAddress, long port);
+
+ /**
+ * This stops the sync daemon. Only used in YARN.
+ */
+ public abstract void stopServer();
+
+ /**
+ * This method should close all used resources, e.G. a ZooKeeper instance.
+ *
+ * @throws InterruptedException
+ */
+ public abstract void close() throws IOException;
+
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java b/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java
new file mode 100644
index 0000000..2d2a61d
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hama.HamaConfiguration;
+
+/**
+ * MasterSyncClient defines the behavior that BSPMaster should follow
+ * to perform different required globally synchronized state changes.
+ *
+ */
+public interface MasterSyncClient extends SyncClient{
+
+ /**
+ * Initialize the Synchronization client.
+ *
+ * @param conf The configuration parameters to initialize the client.
+ */
+ public void init(HamaConfiguration conf);
+
+ /**
+ * Clears all information stored.
+ */
+ public void clear();
+
+ /**
+ * Register a newly added job
+ * @param string
+ */
+ public void registerJob(String string);
+
+ /**
+ * Deregister the job from the system.
+ * @param string
+ */
+ public void deregisterJob(String string);
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java b/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
new file mode 100644
index 0000000..19857e5
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * PeerSyncClient defines the behavior that a BSPPeer performs to maintain
+ * synchronized global state as it progresses.
+ */
+
+public interface PeerSyncClient extends SyncClient{
+
+ /**
+ * Init will be called within a spawned task, it should be used to initialize
+ * the inner structure and fields, e.G. a zookeeper client or an rpc
+ * connection to the real sync daemon.
+ *
+ * @throws Exception
+ */
+ public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+ throws Exception;
+
+ /**
+ * Enters the barrier before the message sending in each superstep.
+ *
+ * @param jobId the jobs ID
+ * @param taskId the tasks ID
+ * @param superstep the superstep of the task
+ * @throws SyncException
+ */
+ public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+ throws SyncException;
+
+ /**
+ * Leaves the barrier after all communication has been done, this is usually
+ * the end of a superstep.
+ *
+ * @param jobId the jobs ID
+ * @param taskId the tasks ID
+ * @param superstep the superstep of the task
+ * @throws SyncException
+ */
+ public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+ throws SyncException;
+
+ /**
+ * Registers a specific task with a its host and port to the sync daemon.
+ *
+ * @param jobId the jobs ID
+ * @param taskId the tasks ID
+ * @param hostAddress the host where the sync server resides
+ * @param port the port where the sync server is up
+ */
+ public void register(BSPJobID jobId, TaskAttemptID taskId,
+ String hostAddress, long port);
+
+ /**
+ * Returns all registered tasks within the sync daemon. They have to be
+ * ordered ascending by their task id.
+ *
+ * @param taskId the tasks ID
+ * @return an <b>ordered</b> string array of host:port pairs of all tasks
+ * connected to the daemon.
+ */
+ public String[] getAllPeerNames(TaskAttemptID taskId);
+
+ /**
+ * TODO this has currently no use. Could later be used to deregister tasks
+ * from the barrier during runtime if they are finished. Something equal to
+ * voteToHalt() in Pregel.
+ *
+ * @param jobId
+ * @param taskId
+ * @param hostAddress
+ * @param port
+ */
+ public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+ String hostAddress, long port);
+
+ /**
+ * This stops the sync daemon. Only used in YARN.
+ */
+ public void stopServer();
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java b/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
index 9564951..e75da95 100644
--- a/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
+++ b/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
@@ -17,9 +17,10 @@
*/
package org.apache.hama.bsp.sync;
-import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPJobID;
-import org.apache.hama.bsp.TaskAttemptID;
/**
* Basic interface for a client that connects to a sync server.
@@ -28,82 +29,85 @@
public interface SyncClient {
/**
- * Init will be called within a spawned task, it should be used to initialize
- * the inner structure and fields, e.G. a zookeeper client or an rpc
- * connection to the real sync daemon.
- *
- * @throws Exception
+ * Construct key in the format required by the SyncClient for storing and
+ * retrieving information. This function is recommended to use to construct
+ * keys for storing keys.
+ * @param jobId The BSP Job Id.
+ * @param args The list of String objects that would be used to construct key
+ * @return The key consisting of entities provided in the required format.
*/
- public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
- throws Exception;
+ public String constructKey(BSPJobID jobId, String ... args);
/**
- * Enters the barrier before the message sending in each superstep.
- *
- * @param jobId the jobs ID
- * @param taskId the tasks ID
- * @param superstep the superstep of the task
- * @throws SyncException
+ * Stores value for the specified key.
+ * @param key The key for which value should be stored. It is recommended to use
+ * <code>constructKey</code> to create key object.
+ * @param value The value to be stored.
+ * @param permanent true if the value should be persisted after end of session.
+ * @param Listener object that provides asynchronous updates on the state
+ * of information stored under the key.
+ * @return true if the operation was successful.
*/
- public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
- throws SyncException;
+ public boolean storeInformation(String key, Writable value,
+ boolean permanent, SyncEventListener listener);
/**
- * Leaves the barrier after all communication has been done, this is usually
- * the end of a superstep.
- *
- * @param jobId the jobs ID
- * @param taskId the tasks ID
- * @param superstep the superstep of the task
- * @throws SyncException
+ * Retrieve value previously store for the key.
+ * @param key The key for which value was stored.
+ * @param classType The expected class instance of value to be extracted
+ * @return the value if found. Returns null if there was any error of if there
+ * was no value stored for the key.
*/
- public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
- throws SyncException;
+ public boolean getInformation(String key, Writable valueHolder);
/**
- * Registers a specific task with a its host and port to the sync daemon.
- *
- * @param jobId the jobs ID
- * @param taskId the tasks ID
- * @param hostAddress the host where the sync server resides
- * @param port the port where the sync server is up
+ * Store new key in key set.
+ * @param key The key to be saved in key set. It is recommended to use
+ * <code>constructKey</code> to create key object.
+ * @param permanent true if the value should be persisted after end of session.
+ * @param listener Listener object that asynchronously notifies the events
+ * related to the key.
+ * @return true if operation was successful.
*/
- public void register(BSPJobID jobId, TaskAttemptID taskId,
- String hostAddress, long port);
+ public boolean addKey(String key, boolean permanent, SyncEventListener listener);
/**
- * Returns all registered tasks within the sync daemon. They have to be
- * ordered ascending by their task id.
- *
- * @param taskId the tasks ID
- * @return an <b>ordered</b> string array of host:port pairs of all tasks
- * connected to the daemon.
+ * Check if key was previously stored.
+ * @param key The value of the key.
+ * @return true if the key exists.
*/
- public String[] getAllPeerNames(TaskAttemptID taskId);
+ public boolean hasKey(String key);
+
+ /**
+ * Get list of child keys stored under the key provided.
+ * @param key The key whose child key set are to be found.
+ * @param listener Listener object that asynchronously notifies the changes
+ * under the provided key
+ * @return Array of child keys.
+ */
+ public String[] getChildKeySet(String key, SyncEventListener listener);
/**
- * TODO this has currently no use. Could later be used to deregister tasks
- * from the barrier during runtime if they are finished. Something equal to
- * voteToHalt() in Pregel.
- *
- * @param jobId
- * @param taskId
- * @param hostAddress
- * @param port
+ * Register a listener for events on the key.
+ * @param key The key on which an event listener should be registered.
+ * @param event for which the listener is registered for.
+ * @param listener The event listener that defines how to process the event.
+ * @return true if the operation is successful.
*/
- public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
- String hostAddress, long port);
+ public boolean registerListener(String key, SyncEvent event,
+ SyncEventListener listener);
/**
- * This stops the sync daemon. Only used in YARN.
+ * Delete the key and the information stored under it.
+ * @param key
+ * @param listener
+ * @return
*/
- public void stopServer();
-
+ public boolean remove(String key, SyncEventListener listener);
+
/**
- * This method should close all used resources, e.G. a ZooKeeper instance.
*
- * @throws InterruptedException
*/
- public void close() throws InterruptedException;
-
+ public void close() throws IOException;
+
}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java b/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java
new file mode 100644
index 0000000..bf459d6
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+/**
+ * A distributed global synchronization event.
+ */
+public interface SyncEvent {
+
+ /**
+ * Returns the event identifier in the scheme of events defined for the
+ * global synchronization service.
+ * @return the event identifier
+ */
+ public int getEventId();
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java b/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java
new file mode 100644
index 0000000..e53c9e9
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+/**
+ * This class is used to define a listener to the synchronized global event.
+ *
+ */
+public abstract class SyncEventListener {
+
+ /**
+ * Every event is identified by an event identifier. You can refer to
+ * <code>SyncEvent</code> class.
+ * @param eventId The event identification code.
+ */
+ public abstract void handleEvent(int eventId);
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java b/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
index 869402d..436f36f 100644
--- a/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
+++ b/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
@@ -22,18 +22,27 @@
public class SyncServiceFactory {
public static final String SYNC_SERVER_CLASS = "hama.sync.server.class";
- public static final String SYNC_CLIENT_CLASS = "hama.sync.client.class";
+ public static final String SYNC_PEER_CLASS = "hama.sync.peer.class";
+ public static final String SYNC_MASTER_CLASS = "hama.sync.master.class";
/**
* Returns a sync client via reflection based on what was configured.
*/
- public static SyncClient getSyncClient(Configuration conf)
+ public static PeerSyncClient getPeerSyncClient(Configuration conf)
throws ClassNotFoundException {
- return (SyncClient) ReflectionUtils
- .newInstance(conf.getClassByName(conf.get(SYNC_CLIENT_CLASS,
+ return (PeerSyncClient) ReflectionUtils
+ .newInstance(conf.getClassByName(conf.get(SYNC_PEER_CLASS,
ZooKeeperSyncClientImpl.class.getName())), conf);
}
+
+ public static SyncClient getMasterSyncClient(Configuration conf)
+ throws ClassNotFoundException {
+ return (SyncClient) ReflectionUtils
+ .newInstance(conf.getClassByName(conf.get(SYNC_MASTER_CLASS,
+ ZKSyncBSPMasterClient.class.getName())), conf);
+ }
+
/**
* Returns a sync server via reflection based on what was configured.
*/
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java
new file mode 100644
index 0000000..7cfb122
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.zookeeper.QuorumPeer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Zookeeper sunchronization client that is used by BSPMaster to maintain global
+ * state of cluster.
+ */
+public class ZKSyncBSPMasterClient extends ZKSyncClient implements
+ MasterSyncClient {
+
+ private ZooKeeper zk = null;
+ private String bspRoot = null;
+
+ Log LOG = LogFactory.getLog(ZKSyncBSPMasterClient.class);
+
+ @Override
+ public void init(HamaConfiguration conf) {
+ try {
+ zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
+ conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+ } catch (IOException e) {
+ LOG.error("Exception during reinitialization!", e);
+ }
+
+ bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
+ Constants.DEFAULT_ZOOKEEPER_ROOT);
+ LOG.info("Initialized ZK " + (null == zk));
+ Stat s = null;
+ if (zk != null) {
+ initialize(zk, bspRoot);
+ try {
+ s = zk.exists(bspRoot, false);
+ } catch (Exception e) {
+ LOG.error(s, e);
+ }
+
+ if (s == null) {
+ try {
+ zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ } else {
+ this.clearZKNodes();
+ }
+ }
+
+ }
+
+ private void createJobRoot(String string) {
+ writeNode(string, null, true, null);
+ }
+
+ @Override
+ public void clear() {
+ clearZKNodes();
+ }
+
+ @Override
+ public void registerJob(String string) {
+ // TODO: delete job root if key is present.
+ createJobRoot(string);
+ }
+
+ @Override
+ public void deregisterJob(String string) {
+ try {
+ clearZKNodes(bspRoot + "/" + string);
+ this.zk.delete(bspRoot + "/" + string, -1);
+ } catch (KeeperException e) {
+ LOG.error("Error deleting job " + string);
+ } catch (InterruptedException e) {
+ LOG.error("Error deleting job " + string);
+ }
+
+ }
+
+ @Override
+ public void close() {
+ try {
+ this.zk.close();
+ } catch (InterruptedException e) {
+ LOG.error("Error closing sync client", e);
+ }
+
+ }
+
+ @Override
+ public void process(WatchedEvent arg0) {
+ LOG.debug("Processing event " + arg0.getPath());
+ LOG.debug("Processing event type " + arg0.getType().toString());
+
+ }
+
+ public ZooKeeper getZK() {
+ return this.zk;
+ }
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java
new file mode 100644
index 0000000..4f9c96a
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * A Zookeeper based BSP distributed synchronization client that provides
+ * primitive synchronization API's
+ *
+ */
+public abstract class ZKSyncClient implements SyncClient, Watcher {
+
+ Log LOG = LogFactory.getLog(ZKSyncClient.class);
+
+ private ZooKeeper zk;
+ private String bspRoot;
+
+ protected Map<String, List<ZKSyncEventListener>> eventListenerMap;
+
+ public ZKSyncClient() {
+ eventListenerMap = new HashMap<String, List<ZKSyncEventListener>>(10);
+ }
+
+ /**
+ * Initializes the zookeeper client-base.
+ */
+ protected void initialize(ZooKeeper zookeeper, String root) {
+ LOG.info("Initializing ZK Sync Client");
+ zk = zookeeper;
+ bspRoot = root;
+ eventListenerMap = new HashMap<String, List<ZKSyncEventListener>>(10);
+ }
+
+ /**
+ * Returns the Node name using conventions to address a superstep progress for
+ * task
+ *
+ * @param taskId The Task ID
+ * @param superstep The superstep number
+ * @return String that represents the Zookeeper node path that could be
+ * created.
+ */
+ protected String getNodeName(TaskAttemptID taskId, long superstep) {
+ return constructKey(taskId.getJobID(), "sync", "" + superstep,
+ taskId.toString());
+ //
+ // bspRoot + "/" + taskId.getJobID().toString() + "/" + superstep + "/"
+ // + taskId.toString();
+ }
+
+ private String correctKey(String key) {
+ if (!key.startsWith("/")) {
+ key = "/" + key;
+ }
+ return key;
+ }
+
+ /**
+ * Check if the zookeeper node exists.
+ *
+ * @param path The Zookeeper node path to check.
+ * @param watcher A Watcher that would trigger interested events on the node.
+ * This value could be null if no watcher has to be left.
+ * @return true if the node exists.
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ protected boolean isExists(final String path, Watcher watcher)
+ throws KeeperException, InterruptedException {
+ synchronized (zk) {
+ return !(null == zk.exists(path, false));
+ }
+ }
+
+ /**
+ * Returns the zookeeper stat object.
+ *
+ * @param path The path of the expected Zookeeper node.
+ * @return Stat object for the Zookeeper path.
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ protected Stat getStat(final String path) throws KeeperException,
+ InterruptedException {
+ synchronized (zk) {
+ return zk.exists(path, false);
+ }
+ }
+
+ private void createZnode(final String path, final CreateMode mode,
+ byte[] data, Watcher watcher) throws KeeperException,
+ InterruptedException {
+
+ synchronized (zk) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking node " + path);
+ }
+ Stat s = zk.exists(path, false);
+ if (null == s) {
+ try {
+ zk.create(path, data, Ids.OPEN_ACL_UNSAFE, mode);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created node " + path);
+ }
+
+ } catch (KeeperException.NodeExistsException nee) {
+ LOG.debug("Ignore because znode may be already created at " + path,
+ nee);
+ }
+ }
+ }
+ }
+
+ /**
+ * Utility function to get byte array out of Writable
+ *
+ * @param value The Writable object to be converted to byte array.
+ * @return byte array from the Writable object. Returns null on given null
+ * value or on error.
+ */
+ protected byte[] getBytesForData(Writable value) {
+ byte[] data = null;
+
+ if (value != null) {
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ DataOutputStream outputStream = new DataOutputStream(byteStream);
+ try {
+ value.write(outputStream);
+ outputStream.flush();
+ data = byteStream.toByteArray();
+ } catch (IOException e) {
+ LOG.error("Error writing data to write buffer.", e);
+ } finally {
+ try {
+ byteStream.close();
+ outputStream.close();
+ } catch (IOException e) {
+ LOG.error("Error closing byte stream.", e);
+ }
+ }
+ }
+ return data;
+ }
+
+ /**
+ * Utility function to read Writable object value from byte array.
+ *
+ * @param data The byte array
+ * @param classType The Class object of expected Writable object.
+ * @return The instance of Writable object.
+ * @throws IOException
+ */
+ protected boolean getValueFromBytes(byte[] data,
+ Writable valueHolder) throws IOException {
+ if (data != null) {
+ ByteArrayInputStream istream = new ByteArrayInputStream(data);
+ DataInputStream diStream = new DataInputStream(istream);
+ try {
+ valueHolder.readFields(diStream);
+ } finally {
+ diStream.close();
+ }
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Read value stored in the Zookeeper node.
+ *
+ * @param path The path of the Zookeeper node.
+ * @param classType The expected class type of the Writable object.
+ * @return The Writable object constructed from the value read from the
+ * Zookeeper node.
+ */
+ protected boolean extractData(String path,
+ Writable valueHolder) {
+ try {
+ Stat stat = getStat(path);
+ if (stat != null) {
+ byte[] data = this.zk.getData(path, false, stat);
+ try {
+ getValueFromBytes(data, valueHolder);
+ } catch (IOException e) {
+ LOG.error(
+ new StringBuffer(200).append("Error getting data from path ")
+ .append(path).toString(), e);
+ return false;
+ }
+ return true;
+ }
+
+ } catch (KeeperException e) {
+ LOG.error(new StringBuilder(200).append("Error checking zk path ")
+ .append(path).toString(), e);
+
+ } catch (InterruptedException e) {
+ LOG.error(new StringBuilder(200).append("Error checking zk path ")
+ .append(path).toString(), e);
+
+ }
+ return false;
+
+ }
+
+ /**
+ * Writes data into the Zookeeper node. If the path does not exist the
+ * zookeeper node is created recursively and the value is stored in the node.
+ *
+ * @param path The path of the Zookeeper node.
+ * @param value The value to be stored in the Zookeeper node.
+ * @param persistent true if the node to be created is ephemeral of permanent
+ * @param watcher If any Watcher object should listen to event on the
+ * Zookeeper node.
+ * @return true if operation is successful.
+ */
+ protected boolean writeNode(String path, Writable value, boolean persistent,
+ Watcher watcher) {
+ if (path == null || "".equals(path.trim())) {
+ return false;
+ }
+ path = correctKey(path);
+ boolean pathExists = false;
+ try {
+ pathExists = isExists(path, watcher);
+ } catch (KeeperException e) {
+ LOG.error(new StringBuilder(200).append("Error checking zk path ")
+ .append(path).toString(), e);
+ } catch (InterruptedException e) {
+ LOG.error(new StringBuilder(200).append("Error checking zk path ")
+ .append(path).toString(), e);
+ }
+
+ byte[] data = getBytesForData(value);
+
+ if (!pathExists) {
+ try {
+ String[] pathComponents = path.split("/");
+ StringBuffer pathBuffer = new StringBuffer(path.length()
+ + pathComponents.length);
+ for (int i = 0; i < pathComponents.length - 1; ++i) {
+ if (pathComponents[i].equals(""))
+ continue;
+ pathBuffer.append("/").append(pathComponents[i]);
+ createZnode(pathBuffer.toString(), CreateMode.PERSISTENT, null,
+ watcher);
+ }
+ pathBuffer.append("/")
+ .append(pathComponents[pathComponents.length - 1]);
+ CreateMode mode = CreateMode.EPHEMERAL;
+ if (persistent) {
+ mode = CreateMode.PERSISTENT;
+ }
+ createZnode(pathBuffer.toString(), mode, data, watcher);
+
+ return true;
+ } catch (InterruptedException e) {
+ LOG.error(new StringBuilder(200).append("Error creating zk path ")
+ .append(path).toString(), e);
+ } catch (KeeperException e) {
+ LOG.error(new StringBuilder(200).append("Error creating zk path ")
+ .append(path).toString(), e);
+ }
+ } else if (value != null) {
+ try {
+ this.zk.setData(path, data, -1);
+ return true;
+ } catch (InterruptedException e) {
+ LOG.error(new StringBuilder(200).append("Error modifying zk path ")
+ .append(path).toString(), e);
+ return false;
+ } catch (KeeperException e) {
+ LOG.error(new StringBuilder(200).append("Error modifying zk path ")
+ .append(path).toString(), e);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public String constructKey(BSPJobID jobId, String... args) {
+ StringBuffer keyBuffer = new StringBuffer(100);
+ keyBuffer.append(bspRoot);
+ if (jobId != null)
+ keyBuffer.append("/").append(jobId.toString());
+ for (String arg : args) {
+ keyBuffer.append("/").append(arg);
+ }
+ return keyBuffer.toString();
+ }
+
+ @Override
+ public boolean storeInformation(String key, Writable value,
+ boolean permanent, SyncEventListener listener) {
+ ZKSyncEventListener zkListener = (ZKSyncEventListener) listener;
+ key = correctKey(key);
+ final String path = key;
+ LOG.info("Writing data " + path);
+ return writeNode(path, value, permanent, zkListener);
+ }
+
+ @Override
+ public boolean getInformation(String key, Writable valueHolder) {
+ key = correctKey(key);
+ final String path = key;
+ return extractData(path, valueHolder);
+ }
+
+ @Override
+ public boolean addKey(String key, boolean permanent,
+ SyncEventListener listener) {
+ ZKSyncEventListener zkListener = (ZKSyncEventListener) listener;
+ return writeNode(key, null, permanent, zkListener);
+ }
+
+ @Override
+ public boolean hasKey(String key) {
+ try {
+ return isExists(key, null);
+ } catch (KeeperException e) {
+ LOG.error(new StringBuilder(200).append("Error checking zk path ")
+ .append(key).toString(), e);
+ } catch (InterruptedException e) {
+ LOG.error(new StringBuilder(200).append("Error checking zk path ")
+ .append(key).toString(), e);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean registerListener(String key, SyncEvent event,
+ SyncEventListener listener) {
+ key = correctKey(key);
+
+ LOG.debug("Registering listener for " + key);
+ ZKSyncEventListener zkListener = (ZKSyncEventListener) listener;
+ zkListener.setSyncEvent(event);
+ zkListener.setZKSyncClient(this);
+ synchronized (this.zk) {
+
+ try {
+ Stat stat = this.zk.exists(key, zkListener);
+ if (stat == null) {
+ writeNode(key, null, true, zkListener);
+ }
+ this.zk.getData(key, zkListener, stat);
+ this.zk.getChildren(key, zkListener);
+ // List<ZKSyncEventListener> list = this.eventListenerMap.get(key);
+ // if(!eventListenerMap.containsKey(key)){
+ // list = new ArrayList<ZKSyncEventListener>(5);
+ // }
+ // list.add(zkListener);
+ // this.eventListenerMap.put(key, list);
+ return true;
+ } catch (KeeperException e) {
+ LOG.error("Error getting stat and data.", e);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted getting stat and data.", e);
+ }
+
+ }
+ return false;
+ }
+
+ @Override
+ public String[] getChildKeySet(String key, SyncEventListener listener) {
+ key = correctKey(key);
+ ZKSyncEventListener zkListener = null;
+ if (listener != null) {
+ zkListener = (ZKSyncEventListener) listener;
+ }
+ Stat stat = null;
+ String[] children = new String[0];
+ try {
+ stat = this.zk.exists(key, null);
+
+ } catch (KeeperException e) {
+ LOG.error("Error getting stat and data.", e);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted getting stat and data.", e);
+ }
+ if (stat == null)
+ return children;
+
+ try {
+ List<String> childList = this.zk.getChildren(key, zkListener);
+ children = new String[childList.size()];
+ childList.toArray(children);
+ } catch (KeeperException e) {
+ LOG.error("Error getting stat and data.", e);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted getting stat and data.", e);
+ }
+
+ return children;
+ }
+
+ /**
+ * Clears all sub-children of node bspRoot
+ */
+ protected void clearZKNodes() {
+ try {
+ Stat s = zk.exists(bspRoot, false);
+ if (s != null) {
+ clearZKNodes(bspRoot);
+ }
+
+ } catch (Exception e) {
+ LOG.warn("Could not clear zookeeper nodes.", e);
+ }
+ }
+
+ /**
+ * Clears all sub-children of node rooted at path.
+ *
+ * @param path
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ protected void clearZKNodes(String path) throws KeeperException,
+ InterruptedException {
+ ArrayList<String> list = (ArrayList<String>) zk.getChildren(path, false);
+
+ if (list.size() == 0) {
+ return;
+
+ } else {
+ for (String node : list) {
+ clearZKNodes(path + "/" + node);
+ LOG.info("Deleting " + path + "/" + node);
+ zk.delete(path + "/" + node, -1); // delete any version of this
+ // node.
+ }
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent arg0) {
+ }
+
+ @Override
+ public boolean remove(String key, SyncEventListener listener) {
+ key = correctKey(key);
+ try {
+ clearZKNodes(key);
+ this.zk.delete(key, -1);
+ return true;
+ } catch (KeeperException e) {
+ LOG.error("Error deleting key " + key);
+ } catch (InterruptedException e) {
+ LOG.error("Error deleting key " + key);
+ }
+ return false;
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ try {
+ this.zk.close();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ }
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java
new file mode 100644
index 0000000..7ce8ce3
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+/**
+ * Zookeeper Synchronization Event Factory.
+ * <ul>It provides three event definitions.
+ * <li>Value stored in a Zookeeper node is changed
+ * <li>A new child node is added to a Zookeeper node.
+ * <li>A Zookeeper node is deleted.
+ * </ul>
+ */
+public class ZKSyncEventFactory {
+
+ public static enum ZKEvent{
+ VALUE_CHANGE_EVENT(0),
+ CHILD_ADD_EVENT(1),
+ DELETE_EVENT(2);
+
+ private final int id;
+
+ ZKEvent(int num){
+ this.id = num;
+ }
+
+ public int getValue(){
+ return this.id;
+ }
+
+ public static int getEventCount(){
+ return ZKEvent.values().length;
+ }
+
+ public String getName(int num){
+ if(num >=0 && num < ZKEvent.getEventCount()){
+ return ZKEvent.values()[num].name();
+ }
+ else
+ throw new IllegalArgumentException((new StringBuilder(100)
+ .append("The value ")
+ .append(num).append(" is not a valid ZKEvent type. ")
+ .append("Expected range is 0-")
+ .append(getEventCount()-1)).toString());
+ }
+
+ };
+
+ public static int getSupportedEventCount(){
+ return ZKEvent.getEventCount();
+ }
+
+ private static class ValueChangeEvent implements SyncEvent {
+
+ @Override
+ public int getEventId() {
+ return ZKEvent.VALUE_CHANGE_EVENT.getValue();
+ }
+
+ }
+
+ private static class ChildAddEvent implements SyncEvent {
+
+ @Override
+ public int getEventId() {
+ return ZKEvent.CHILD_ADD_EVENT.getValue();
+ }
+
+ }
+
+ private static class DeleteEvent implements SyncEvent {
+
+ @Override
+ public int getEventId() {
+ return ZKEvent.DELETE_EVENT.getValue();
+ }
+
+ }
+
+ /**
+ * Provides the Zookeeper node value change event definition.
+ * @return the Zookeeper value changed event.
+ */
+ public static SyncEvent getValueChangeEvent(){
+ return new ValueChangeEvent();
+ }
+
+ /**
+ * Provides the Zookeeper deletion event definition.
+ * @return the Zookeeper node is deleted event
+ */
+ public static SyncEvent getDeletionEvent(){
+ return new DeleteEvent();
+ }
+
+ /**
+ * Provides the Zookeeper child addition event definition.
+ * @return the Zookeeper child node is added event
+ */
+ public static SyncEvent getChildAddEvent(){
+ return new ChildAddEvent();
+ }
+
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java
new file mode 100644
index 0000000..fcc83fa
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+public abstract class ZKSyncEventListener extends SyncEventListener
+ implements Watcher {
+Log LOG = LogFactory.getLog(SyncEventListener.class);
+
+ private ZKSyncClient client;
+ private SyncEvent event;
+
+ /**
+ *
+ */
+ @Override
+ public void process(WatchedEvent event) {
+
+ client.registerListener(event.getPath(),
+ ZKSyncEventFactory.getValueChangeEvent()
+ , this);
+ //if(LOG.isDebugEnabled()){
+ LOG.debug(event.toString());
+ //}
+
+ if(event.getType().equals(EventType.NodeChildrenChanged)){
+ LOG.debug("Node children changed - " + event.getPath());
+ onChildKeySetChange();
+ }
+ else if (event.getType().equals(EventType.NodeDeleted)){
+ LOG.debug("Node children deleted - " + event.getPath());
+ onDelete();
+ }
+ else if (event.getType().equals(EventType.NodeDataChanged)){
+ LOG.debug("Node children changed - " + event.getPath());
+
+ onChange();
+ }
+
+ }
+
+ public void setZKSyncClient(ZKSyncClient zkClient){
+ client = zkClient;
+ }
+
+ public void setSyncEvent(SyncEvent event){
+ this.event = event;
+ }
+
+ public SyncEvent getEvent(){
+ return this.event;
+ }
+
+ /**
+ *
+ */
+ public abstract void onDelete();
+
+ /**
+ *
+ */
+ public abstract void onChange();
+
+ /**
+ *
+ */
+ public abstract void onChildKeySetChange();
+
+ @Override
+ public void handleEvent(int eventId) {
+
+ }
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java b/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
index bd1c28e..ad7dc07 100644
--- a/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
@@ -17,10 +17,6 @@
*/
package org.apache.hama.bsp.sync;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
@@ -47,7 +43,8 @@
* This client class abstracts the use of our zookeeper sync code.
*
*/
-public class ZooKeeperSyncClientImpl implements SyncClient, Watcher {
+public class ZooKeeperSyncClientImpl extends ZKSyncClient implements
+ PeerSyncClient {
/*
* TODO maybe extract an abstract class and let the subclasses implement
@@ -81,6 +78,8 @@
int bindPort = conf
.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+ initialize(this.zk, bspRoot);
+
peerAddress = new InetSocketAddress(bindAddress, bindPort);
LOG.info("Start connecting to Zookeeper! At " + peerAddress);
numBSPTasks = conf.getInt("bsp.peers.num", 1);
@@ -93,14 +92,14 @@
try {
synchronized (zk) {
- createZnode(bspRoot);
- final String pathToJobIdZnode = bspRoot + "/"
- + taskId.getJobID().toString();
- createZnode(pathToJobIdZnode);
- final String pathToSuperstepZnode = pathToJobIdZnode + "/" + superstep;
- createZnode(pathToSuperstepZnode);
+
+ final String pathToSuperstepZnode =
+ constructKey(taskId.getJobID(), "sync", ""+superstep);
+
+ writeNode(pathToSuperstepZnode, null, true, null);
BarrierWatcher barrierWatcher = new BarrierWatcher();
- // this is really needed to register the barrier watcher, don't remove this line!
+ // this is really needed to register the barrier watcher, don't remove
+ // this line!
zk.exists(pathToSuperstepZnode + "/ready", barrierWatcher);
zk.create(getNodeName(taskId, superstep), null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
@@ -131,7 +130,7 @@
} else {
LOG.debug("---> at superstep: " + superstep
+ " task that is creating /ready znode:" + taskId.toString());
- createEphemeralZnode(pathToSuperstepZnode + "/ready");
+ writeNode(pathToSuperstepZnode + "/ready", null, false, null);
}
}
} catch (Exception e) {
@@ -143,8 +142,10 @@
public void leaveBarrier(final BSPJobID jobId, final TaskAttemptID taskId,
final long superstep) throws SyncException {
try {
- final String pathToSuperstepZnode = bspRoot + "/"
- + taskId.getJobID().toString() + "/" + superstep;
+// final String pathToSuperstepZnode = bspRoot + "/"
+// + taskId.getJobID().toString() + "/" + superstep;
+ final String pathToSuperstepZnode =
+ constructKey(taskId.getJobID(), "sync", ""+superstep);
while (true) {
List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
LOG.debug("leaveBarrier() !!! checking znodes contnains /ready node or not: at superstep:"
@@ -236,8 +237,9 @@
public void register(BSPJobID jobId, TaskAttemptID taskId,
String hostAddress, long port) {
try {
- if (zk.exists("/" + jobId.toString(), false) == null) {
- zk.create("/" + jobId.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE,
+ String jobRegisterKey = constructKey(jobId, "peers");
+ if (zk.exists(jobRegisterKey, false) == null) {
+ zk.create(jobRegisterKey, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
@@ -245,7 +247,7 @@
} catch (InterruptedException e) {
LOG.error(e);
}
- registerTask(zk, jobId, hostAddress, port, taskId);
+ registerTask(jobId, hostAddress, port, taskId);
}
/**
@@ -259,54 +261,14 @@
* @param port
* @param taskId
*/
- public static void registerTask(ZooKeeper zk, BSPJobID jobId,
- String hostAddress, long port, TaskAttemptID taskId) {
+ public void registerTask(BSPJobID jobId, String hostAddress, long port,
+ TaskAttemptID taskId) {
- byte[] taskIdBytes = serializeTaskId(taskId);
+ // byte[] taskIdBytes = serializeTaskId(taskId);
+ String taskRegisterKey = constructKey(jobId, "peers", hostAddress + ":"
+ + port);
+ writeNode(taskRegisterKey, taskId, false, null);
- try {
- zk.create("/" + jobId.toString() + "/" + hostAddress + ":" + port,
- taskIdBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- } catch (KeeperException e) {
- LOG.error(e);
- } catch (InterruptedException e) {
- LOG.error(e);
- }
- }
-
- private static byte[] serializeTaskId(TaskAttemptID taskId) {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(bos);
- try {
- taskId.write(out);
- } catch (IOException e) {
- LOG.error(e);
- } finally {
- try {
- out.close();
- } catch (IOException e) {
- LOG.error(e);
- }
- }
- return bos.toByteArray();
- }
-
- public static TaskAttemptID deserializeTaskId(byte[] arr) {
- ByteArrayInputStream bis = new ByteArrayInputStream(arr);
- DataInputStream in = new DataInputStream(bis);
- TaskAttemptID id = new TaskAttemptID();
- try {
- id.readFields(in);
- } catch (IOException e) {
- LOG.error(e);
- } finally {
- try {
- in.close();
- } catch (IOException e) {
- LOG.error(e);
- }
- }
- return id;
}
@Override
@@ -314,16 +276,22 @@
if (allPeers == null) {
TreeMap<Integer, String> sortedMap = new TreeMap<Integer, String>();
try {
- allPeers = zk.getChildren("/" + taskId.getJobID().toString(), this)
- .toArray(new String[0]);
+ allPeers = zk.getChildren(constructKey(taskId.getJobID(), "peers"),
+ this).toArray(new String[0]);
for (String s : allPeers) {
- byte[] data = zk.getData(
- "/" + taskId.getJobID().toString() + "/" + s, this, null);
- TaskAttemptID thatTask = deserializeTaskId(data);
- LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:"
- + thatTask.getTaskID().getId() + " : " + s);
- sortedMap.put(thatTask.getTaskID().getId(), s);
+ byte[] data = zk.getData(constructKey(taskId.getJobID(), "peers", s),
+ this, null);
+ TaskAttemptID thatTask = new TaskAttemptID();
+ boolean result = getValueFromBytes(data, thatTask);
+
+ if(result){
+ LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:"
+ + thatTask.getTaskID().getId() + " : " + s);
+ sortedMap.put(thatTask.getTaskID().getId(), s);
+ }
+
+
}
} catch (Exception e) {
@@ -344,8 +312,13 @@
}
@Override
- public void close() throws InterruptedException {
- zk.close();
+ public void close() throws IOException {
+ try{
+ zk.close();
+ }
+ catch(InterruptedException e){
+ throw new IOException(e);
+ }
}
@Override
@@ -379,36 +352,6 @@
return peerAddress.getHostName() + ":" + peerAddress.getPort();
}
- private String getNodeName(TaskAttemptID taskId, long superstep) {
- return bspRoot + "/" + taskId.getJobID().toString() + "/" + superstep + "/"
- + taskId.toString();
- }
-
- private void createZnode(final String path) throws KeeperException,
- InterruptedException {
- createZnode(path, CreateMode.PERSISTENT);
- }
-
- private void createEphemeralZnode(final String path) throws KeeperException,
- InterruptedException {
- createZnode(path, CreateMode.EPHEMERAL);
- }
-
- private void createZnode(final String path, final CreateMode mode)
- throws KeeperException, InterruptedException {
- synchronized (zk) {
- Stat s = zk.exists(path, false);
- if (null == s) {
- try {
- zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
- } catch (KeeperException.NodeExistsException nee) {
- LOG.debug("Ignore because znode may be already created at " + path,
- nee);
- }
- }
- }
- }
-
/*
* INNER CLASSES
*/
diff --git a/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java b/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java
new file mode 100644
index 0000000..28e914a
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>BSPResource defines a resource entity that would be used as a factor
+ * for allocating tasks on groom-servers.
+ */
+public abstract class BSPResource {
+
+ /**
+ * Returns the list of grooms on which the current resource is available or
+ * local or is best chosen for the task.
+ *
+ * @param tip The <code>TaskInProgress</code> representing the task to
+ * schedule.
+ * @return The list of groomserver host names.
+ */
+ public abstract String[] getGrooms(TaskInProgress tip);
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java b/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
new file mode 100644
index 0000000..a132cb6
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>BestEffortDataLocalTaskAllocator</code> is a simple task allocator that
+ * takes in only the data locality as a constraint for allocating tasks. It
+ * makes the best attempt to schedule task on the groom server with the input
+ * split. If the aforesaid is not possible, it selects any other available groom
+ * to allocate tasks on.
+ */
+public class BestEffortDataLocalTaskAllocator implements TaskAllocationStrategy {
+
+ Log LOG = LogFactory.getLog(BestEffortDataLocalTaskAllocator.class);
+
+ @Override
+ public void initialize(Configuration conf) {
+ }
+
+ /**
+ * Returns the first groom that has a slot to schedule a task on.
+ *
+ * @param grooms
+ * @param tasksInGroomMap
+ * @return
+ */
+ private String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
+ Map<GroomServerStatus, Integer> tasksInGroomMap) {
+
+ Iterator<String> groomIter = grooms.keySet().iterator();
+ while (groomIter.hasNext()) {
+ GroomServerStatus groom = grooms.get(groomIter.next());
+ if (groom == null)
+ continue;
+ Integer taskInGroom = tasksInGroomMap.get(groom);
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (taskInGroom < groom.getMaxTasks()) {
+ return groom.getGroomHostName();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * From the set of grooms given, returns the groom on which a task could be
+ * scheduled on.
+ *
+ * @param grooms
+ * @param tasksInGroomMap
+ * @param possibleLocations
+ * @return
+ */
+ private String getGroomToSchedule(Map<String, GroomServerStatus> grooms,
+ Map<GroomServerStatus, Integer> tasksInGroomMap,
+ String[] possibleLocations) {
+
+ for (int i = 0; i < possibleLocations.length; ++i) {
+ String location = possibleLocations[i];
+ GroomServerStatus groom = grooms.get(location);
+ if (groom == null)
+ continue;
+ Integer taskInGroom = tasksInGroomMap.get(groom);
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (taskInGroom < groom.getMaxTasks()
+ && location.equals(groom.getGroomHostName())) {
+ return groom.getGroomHostName();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public GroomServerStatus getGroomToAllocate(
+ Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress) {
+ if (!taskInProgress.canStartTask())
+ return null;
+
+ String groomName = null;
+ if (selectedGrooms != null) {
+ groomName = getGroomToSchedule(groomStatuses, taskCountInGroomMap,
+ selectedGrooms);
+ }
+
+ if (groomName == null) {
+ groomName = getAnyGroomToSchedule(groomStatuses, taskCountInGroomMap);
+ }
+
+ if (groomName != null) {
+ return groomStatuses.get(groomName);
+ }
+
+ return null;
+ }
+
+ /**
+ * Select grooms that has the block of data locally stored on the groom
+ * server.
+ */
+ @Override
+ public String[] selectGrooms(Map<String, GroomServerStatus> groomStatuses,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress) {
+ if (!taskInProgress.canStartTask()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot start task based on id");
+ }
+ return new String[0];
+ }
+
+ RawSplit rawSplit = taskInProgress.getFileSplit();
+ if (rawSplit != null) {
+ return rawSplit.getLocations();
+ }
+ return null;
+ }
+
+ /**
+ * This operation is not supported.
+ */
+ @Override
+ public Set<GroomServerStatus> getGroomsToAllocate(
+ Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress) {
+ throw new UnsupportedOperationException(
+ "This API is not supported for the called API function call.");
+ }
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java b/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java
new file mode 100644
index 0000000..3006a94
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>RawSplitResource</code> defines the data block resource that could be
+ * used to find which groom to schedule for data-locality.
+ */
+public class RawSplitResource extends BSPResource{
+
+ private RawSplit split;
+
+ public RawSplitResource(){
+
+ }
+
+ /**
+ * Initialize the resource with data block split information.
+ * @param split The data-split provided by <code>BSPJobClient</client>
+ */
+ public RawSplitResource(RawSplit split){
+ this.split = split;
+ }
+
+ @Override
+ public String[] getGrooms(TaskInProgress tip) {
+ return split.getLocations();
+ }
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java b/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java
new file mode 100644
index 0000000..1f15607
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>TaskAllocationStrategy</class> defines the behavior of task allocation
+ * strategy that is employed by a Hama job to schedule tasks in GroomServers in
+ * Hama cluster. The function <code>selectGrooms</code> is responsible to define
+ * the strategy to select the grooms. This list of grooms could be used in the
+ * functions <code>getGroomToAllocate</code> or <code>getGroomsToAllocate</code>
+ * in the parameter <code>selectedGrooms</code>. The two functions are not given
+ * the responsibility to select grooms because these functions could also be
+ * handling the task of allocating tasks on any other restricted set of grooms
+ * that the caller invokes them for.
+ *
+ */
+
+@Unstable
+public interface TaskAllocationStrategy {
+
+ /**
+ * Initializes the <code>TaskAllocationStrategy</code> instance.
+ *
+ * @param conf Hama configuration
+ */
+ public abstract void initialize(Configuration conf);
+
+ /**
+ * Defines the task-allocation strategy to select the grooms based on the
+ * resource constraints and the task related restrictions posed. This function
+ * could be used to populate the <code>selectedGrooms</code> argument in the
+ * functions <code>getGroomToAllocate</code> and
+ * <code>getGroomsToAllocate</code>
+ *
+ * @param groomStatuses The map of groom-name to
+ * <code>GroomServerStatus</code> object for all known grooms.
+ * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+ * soon)
+ * @param taskInProgress The <code>TaskInProgress</code> object for the task.
+ * @return An array of hostnames where the tasks could be allocated on.
+ */
+ @Unstable
+ public abstract String[] selectGrooms(
+ Map<String, GroomServerStatus> groomStatuses,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress);
+
+ /**
+ * Returns the best groom to run the task on based on the set of grooms
+ * provided. The designer of the class can choose to populate the
+ * <code>selectedGrooms</code> value with the function
+ * <code>selectGrooms</code>
+ *
+ * @param groomStatuses The map of groom-name to
+ * <code>GroomServerStatus</code> object for all known grooms.
+ * @param selectedGrooms An array of selected groom host-names to select from.
+ * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+ * soon)
+ * @param taskInProgress The <code>TaskInProgress</code> object for the task.
+ * @return Host Name of the selected groom. Returns null if no groom could be
+ * found.
+ */
+ @Unstable
+ public abstract GroomServerStatus getGroomToAllocate(
+ Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress);
+
+ /**
+ * Returns the best grooms to run the task on based on the set of grooms
+ * provided. The designer of the class can choose to populate the
+ * <code>selectedGrooms</code> value with the function
+ * <code>selectGrooms</code>
+ *
+ * @param groomStatuses The map of groom-name to
+ * <code>GroomServerStatus</code> object for all known grooms.
+ * @param selectedGrooms An array of selected groom host-names to select from.
+ * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+ * soon)
+ * @param taskInProgress The <code>TaskInProgress</code> object for the task.
+ * @return Host Names of the selected grooms where the task could be
+ * allocated. Returns null if no groom could be found.
+ */
+ @Unstable
+ public abstract Set<GroomServerStatus> getGroomsToAllocate(
+ Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress);
+}
diff --git a/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java b/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
index 80e3578..3cc1b4a 100644
--- a/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
+++ b/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
@@ -316,7 +316,7 @@
HamaConfiguration hamaConf = new HamaConfiguration();
hamaConf.setInt(Constants.GROOM_PING_PERIOD, 200);
hamaConf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
- hamaConf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
+ hamaConf.setClass(SyncServiceFactory.SYNC_PEER_CLASS,
LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
hamaConf.setInt("bsp.master.port", 610002);
@@ -421,7 +421,7 @@
conf.setInt(Constants.GROOM_PING_PERIOD, 200);
conf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
- conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
+ conf.setClass(SyncServiceFactory.SYNC_PEER_CLASS,
LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
int testNumber = incrementTestNumber();
diff --git a/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java b/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
index e756638..e34b05b 100644
--- a/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
+++ b/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
@@ -17,7 +17,18 @@
*/
package org.apache.hama.bsp;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import junit.framework.TestCase;
@@ -25,19 +36,30 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.TestBSPTaskFaults.MinimalGroomServer;
-import org.apache.hama.bsp.message.type.ByteMessage;
-import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.Counters.Counter;
+import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
+import org.apache.hama.bsp.ft.FaultTolerantPeerService;
+import org.apache.hama.bsp.message.MessageEventListener;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.MessageQueue;
+import org.apache.hama.bsp.sync.BSPPeerSyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
+import org.apache.hama.bsp.sync.SyncEvent;
+import org.apache.hama.bsp.sync.SyncEventListener;
+import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
-import org.apache.hama.ipc.BSPPeerProtocol;
-import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.util.BSPNetUtils;
+import org.apache.hama.util.KeyValuePair;
public class TestCheckpoint extends TestCase {
@@ -45,130 +67,578 @@
static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/";
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public void testCheckpoint() throws Exception {
- Configuration config = new Configuration();
- config.set(SyncServiceFactory.SYNC_CLIENT_CLASS,
- LocalBSPRunner.LocalSyncClient.class.getName());
- config.set("bsp.output.dir", "/tmp/hama-test_out");
- FileSystem dfs = FileSystem.get(config);
+ public static class TestMessageManager implements MessageManager<Text> {
- BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs);
- bspTask.setCurrentTaskStatus(new TaskStatus(new BSPJobID(),
- new TaskAttemptID(), 1.0f, TaskStatus.State.RUNNING, "running",
- "127.0.0.1", TaskStatus.Phase.STARTING, new Counters()));
- assertNotNull("BSPPeerImpl should not be null.", bspTask);
- if (dfs.mkdirs(new Path("checkpoint"))) {
- if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001"))) {
- if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001/0")))
- ;
+ List<Text> messageQueue = new ArrayList<Text>();
+ BSPMessageBundle<Text> loopbackBundle = new BSPMessageBundle<Text>();
+ Iterator<Text> iter = null;
+ MessageEventListener<Text> listener;
+
+ @Override
+ public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, Text> peer,
+ Configuration conf, InetSocketAddress peerAddress) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Text getCurrentMessage() throws IOException {
+ if (iter == null)
+ iter = this.messageQueue.iterator();
+ if (iter.hasNext())
+ return iter.next();
+ return null;
+ }
+
+ @Override
+ public void send(String peerName, Text msg) throws IOException {
+ }
+
+ @Override
+ public void finishSendPhase() throws IOException {
+ }
+
+ @Override
+ public Iterator<Entry<InetSocketAddress, MessageQueue<Text>>> getMessageIterator() {
+ return null;
+ }
+
+ @Override
+ public void transfer(InetSocketAddress addr, BSPMessageBundle<Text> bundle)
+ throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void clearOutgoingQueues() {
+ }
+
+ @Override
+ public int getNumCurrentMessages() {
+ return this.messageQueue.size();
+ }
+
+ public BSPMessageBundle<Text> getLoopbackBundle() {
+ return this.loopbackBundle;
+ }
+
+ public void addMessage(Text message) throws IOException {
+ this.messageQueue.add(message);
+ listener.onMessageReceived(message);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) {
+ this.loopbackBundle = (BSPMessageBundle<Text>) bundle;
+ }
+
+ @Override
+ public void loopBackMessage(Writable message) {
+ }
+
+ @Override
+ public void registerListener(MessageEventListener<Text> listener)
+ throws IOException {
+ this.listener = listener;
+ }
+
+ }
+
+ public static class TestBSPPeer implements
+ BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, Text> {
+
+ Configuration conf;
+ long superstepCount;
+ FaultTolerantPeerService<Text> fService;
+
+ public TestBSPPeer(BSPJob job, Configuration conf, TaskAttemptID taskId,
+ Counters counters, long superstep, BSPPeerSyncClient syncClient,
+ MessageManager<Text> messenger, TaskStatus.State state) {
+ this.conf = conf;
+ if (superstep > 0)
+ superstepCount = superstep;
+ else
+ superstepCount = 0L;
+
+ try {
+ fService = (new AsyncRcvdMsgCheckpointImpl<Text>()).constructPeerFaultTolerance(
+ job, (BSPPeer<?, ?, ?, ?, Text>) this,
+ (BSPPeerSyncClient) syncClient, null, taskId, superstep, conf,
+ messenger);
+ this.fService.onPeerInitialized(state);
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
- assertTrue("Make sure directory is created.",
- dfs.exists(new Path(checkpointedDir)));
- byte[] tmpData = "data".getBytes();
- BSPMessageBundle bundle = new BSPMessageBundle();
- bundle.addMessage(new ByteMessage("abc".getBytes(), tmpData));
- assertNotNull("Message bundle can not be null.", bundle);
- assertNotNull("Configuration should not be null.", config);
- bspTask.checkpoint(checkpointedDir + "/attempt_201110302255_0001_000000_0",
- bundle);
- FSDataInputStream in = dfs.open(new Path(checkpointedDir
- + "/attempt_201110302255_0001_000000_0"));
- BSPMessageBundle bundleRead = new BSPMessageBundle();
- bundleRead.readFields(in);
- in.close();
- ByteMessage byteMsg = (ByteMessage) (bundleRead.getMessages()).get(0);
- String content = new String(byteMsg.getData());
- LOG.info("Saved checkpointed content is " + content);
- assertTrue("Message content should be the same.", "data".equals(content));
- dfs.delete(new Path("checkpoint"), true);
+
+ @Override
+ public void send(String peerName, Text msg) throws IOException {
+ }
+
+ @Override
+ public Text getCurrentMessage() throws IOException {
+ return new Text("data");
+ }
+
+ @Override
+ public int getNumCurrentMessages() {
+ return 1;
+ }
+
+ @Override
+ public void sync() throws IOException, SyncException, InterruptedException {
+ ++superstepCount;
+ try {
+ this.fService.afterBarrier();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ LOG.info("After barrier " + superstepCount);
+ }
+
+ @Override
+ public long getSuperstepCount() {
+ return superstepCount;
+ }
+
+ @Override
+ public String getPeerName() {
+ return null;
+ }
+
+ @Override
+ public String getPeerName(int index) {
+ return null;
+ }
+
+ @Override
+ public int getPeerIndex() {
+ return 1;
+ }
+
+ @Override
+ public String[] getAllPeerNames() {
+ return null;
+ }
+
+ @Override
+ public int getNumPeers() {
+ return 0;
+ }
+
+ @Override
+ public void clear() {
+
+ }
+
+ @Override
+ public void write(NullWritable key, NullWritable value) throws IOException {
+
+ }
+
+ @Override
+ public boolean readNext(NullWritable key, NullWritable value)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public KeyValuePair<NullWritable, NullWritable> readNext()
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public void reopenInput() throws IOException {
+
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ return null;
+ }
+
+ @Override
+ public void incrementCounter(Enum<?> key, long amount) {
+
+ }
+
+ @Override
+ public void incrementCounter(String group, String counter, long amount) {
+
+ }
+
+ }
+
+ public static class TempSyncClient extends BSPPeerSyncClient {
+
+ Map<String, Writable> valueMap = new HashMap<String, Writable>();
+
+ @Override
+ public String constructKey(BSPJobID jobId, String... args) {
+ StringBuffer buffer = new StringBuffer(100);
+ buffer.append(jobId.toString()).append("/");
+ for (String arg : args) {
+ buffer.append(arg).append("/");
+ }
+ return buffer.toString();
+ }
+
+ @Override
+ public boolean storeInformation(String key, Writable value,
+ boolean permanent, SyncEventListener listener) {
+ ArrayWritable writables = (ArrayWritable) value;
+ long step = ((LongWritable) writables.get()[0]).get();
+ long count = ((LongWritable) writables.get()[1]).get();
+
+ LOG.info("SyncClient Storing value step = " + step + " count = " + count
+ + " for key " + key);
+ valueMap.put(key, value);
+ return true;
+ }
+
+ @Override
+ public boolean getInformation(String key,
+ Writable valueHolder) {
+ LOG.info("Getting value for key " + key);
+ if(!valueMap.containsKey(key)){
+ return false;
+ }
+ Writable value = valueMap.get(key);
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ DataOutputStream outputStream = new DataOutputStream(byteStream);
+ byte[] data = null;
+ try {
+ value.write(outputStream);
+ outputStream.flush();
+ data = byteStream.toByteArray();
+ ByteArrayInputStream istream = new ByteArrayInputStream(data);
+ DataInputStream diStream = new DataInputStream(istream);
+ valueHolder.readFields(diStream);
+ return true;
+ } catch (IOException e) {
+ LOG.error("Error writing data to write buffer.", e);
+ } finally {
+ try {
+ byteStream.close();
+ outputStream.close();
+ } catch (IOException e) {
+ LOG.error("Error closing byte stream.", e);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean addKey(String key, boolean permanent,
+ SyncEventListener listener) {
+ valueMap.put(key, NullWritable.get());
+ return true;
+ }
+
+ @Override
+ public boolean hasKey(String key) {
+ return valueMap.containsKey(key);
+ }
+
+ @Override
+ public String[] getChildKeySet(String key, SyncEventListener listener) {
+ List<String> list = new ArrayList<String>();
+ Iterator<String> keyIter = valueMap.keySet().iterator();
+ while (keyIter.hasNext()) {
+ String keyVal = keyIter.next();
+ if (keyVal.startsWith(key + "/")) {
+ list.add(keyVal);
+ }
+ }
+ String[] arr = new String[list.size()];
+ list.toArray(arr);
+ return arr;
+ }
+
+ @Override
+ public boolean registerListener(String key, SyncEvent event,
+ SyncEventListener listener) {
+ return false;
+ }
+
+ @Override
+ public boolean remove(String key, SyncEventListener listener) {
+ valueMap.remove(key);
+ return false;
+ }
+
+ @Override
+ public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+ throws Exception {
+ }
+
+ @Override
+ public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId,
+ long superstep) throws SyncException {
+ LOG.info("Enter barrier called - " + superstep);
+ }
+
+ @Override
+ public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId,
+ long superstep) throws SyncException {
+ LOG.info("Exit barrier called - " + superstep);
+ }
+
+ @Override
+ public void register(BSPJobID jobId, TaskAttemptID taskId,
+ String hostAddress, long port) {
+ }
+
+ @Override
+ public String[] getAllPeerNames(TaskAttemptID taskId) {
+ return null;
+ }
+
+ @Override
+ public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+ String hostAddress, long port) {
+ }
+
+ @Override
+ public void stopServer() {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ }
+
+ private void checkSuperstepMsgCount(PeerSyncClient syncClient,
+ @SuppressWarnings("rawtypes")
+ BSPPeer bspTask, BSPJob job, long step, long count) {
+
+ ArrayWritable writableVal = new ArrayWritable(LongWritable.class);
+
+ boolean result = syncClient.getInformation(
+ syncClient.constructKey(job.getJobID(), "checkpoint",
+ "" + bspTask.getPeerIndex()), writableVal);
+
+ assertTrue(result);
+
+ LongWritable superstepNo = (LongWritable) writableVal.get()[0];
+ LongWritable msgCount = (LongWritable) writableVal.get()[1];
+
+ assertEquals(step, superstepNo.get());
+ assertEquals(count, msgCount.get());
}
public void testCheckpointInterval() throws Exception {
+ Configuration config = new Configuration();
+ System.setProperty("user.dir", "/tmp");
+ config.set(SyncServiceFactory.SYNC_PEER_CLASS,
+ TempSyncClient.class.getName());
+ config.set(Constants.FAULT_TOLERANCE_CLASS,
+ AsyncRcvdMsgCheckpointImpl.class.getName());
+ config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true);
+ config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+ config.setInt(Constants.CHECKPOINT_INTERVAL, 2);
+ config.set("bsp.output.dir", "/tmp/hama-test_out");
+ config.set("bsp.local.dir", "/tmp/hama-test");
- Configuration conf = new Configuration();
- conf.set("bsp.output.dir", "/tmp/hama-test_out");
- conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
- LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
+ FileSystem dfs = FileSystem.get(config);
+ BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+ TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
- conf.setBoolean(Constants.CHECKPOINT_ENABLED, false);
+ TestMessageManager messenger = new TestMessageManager();
+ PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
+ .getPeerSyncClient(config);
+ @SuppressWarnings("rawtypes")
+ BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
+ (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
- int port = BSPNetUtils.getFreePort(5000);
- InetSocketAddress inetAddress = new InetSocketAddress(port);
- MinimalGroomServer groom = new MinimalGroomServer(conf);
- Server workerServer = RPC.getServer(groom, inetAddress.getHostName(),
- inetAddress.getPort(), conf);
- workerServer.start();
+ assertNotNull("BSPPeerImpl should not be null.", bspTask);
- LOG.info("Started RPC server");
- conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
- conf.setInt("bsp.peers.num", 1);
+ LOG.info("Created bsp peer and other parameters");
+ int port = BSPNetUtils.getFreePort(12502);
+ LOG.info("Got port = " + port);
- BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
- BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, inetAddress,
- conf);
- LOG.info("Started the proxy connections");
+ boolean result = syncClient.getInformation(
+ syncClient.constructKey(job.getJobID(), "checkpoint",
+ "" + bspTask.getPeerIndex()), new ArrayWritable(LongWritable.class));
- TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
- "job_201110102255", 1), 1), 1);
+ assertFalse(result);
- try {
- BSPJob job = new BSPJob(new HamaConfiguration(conf));
- job.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH);
- job.setOutputFormat(TextOutputFormat.class);
- final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
- BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID,
- new InetSocketAddress("127.0.0.1", port), conf);
+ bspTask.sync();
+ // Superstep 1
+
+ checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
- BSPTask task = new BSPTask();
- task.setConf(job);
+ Text txtMessage = new Text("data");
+ messenger.addMessage(txtMessage);
- @SuppressWarnings("rawtypes")
- BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, conf, tid,
- proto, 0, null, null, new Counters());
+ bspTask.sync();
+ // Superstep 2
- bspPeer.setCurrentTaskStatus(new TaskStatus(new BSPJobID(), tid, 1.0f,
- TaskStatus.State.RUNNING, "running", "127.0.0.1",
- TaskStatus.Phase.STARTING, new Counters()));
+ checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
- assertEquals(bspPeer.isReadyToCheckpoint(), false);
+ messenger.addMessage(txtMessage);
- conf.setBoolean(Constants.CHECKPOINT_ENABLED, true);
- conf.setInt(Constants.CHECKPOINT_INTERVAL, 3);
+ bspTask.sync();
+ // Superstep 3
- bspPeer.sync();
+ checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L);
- LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
- + bspPeer.getSuperstepCount());
- assertEquals(bspPeer.isReadyToCheckpoint(), false);
- bspPeer.sync();
- LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
- + bspPeer.getSuperstepCount());
- assertEquals(bspPeer.isReadyToCheckpoint(), false);
- bspPeer.sync();
- LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
- + bspPeer.getSuperstepCount());
- assertEquals(bspPeer.isReadyToCheckpoint(), true);
+ bspTask.sync();
+ // Superstep 4
- job.setCheckPointInterval(5);
- bspPeer.sync();
- LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
- + bspPeer.getSuperstepCount());
- assertEquals(bspPeer.isReadyToCheckpoint(), false);
- bspPeer.sync();
- LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
- + bspPeer.getSuperstepCount());
- assertEquals(bspPeer.isReadyToCheckpoint(), false);
+ checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L);
- } catch (Exception e) {
- LOG.error("Error testing BSPPeer.", e);
- } finally {
- umbilical.close();
- Thread.sleep(2000);
- workerServer.stop();
- Thread.sleep(2000);
- }
+ messenger.addMessage(txtMessage);
+ messenger.addMessage(txtMessage);
+ bspTask.sync();
+ // Superstep 5
+
+ checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L);
+
+ bspTask.sync();
+ // Superstep 6
+
+ checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L);
+
+ dfs.delete(new Path("checkpoint"), true);
}
+
+ @SuppressWarnings("rawtypes")
+ public void testCheckpoint() throws Exception {
+ Configuration config = new Configuration();
+ config.set(SyncServiceFactory.SYNC_PEER_CLASS,
+ TempSyncClient.class.getName());
+ config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true);
+ config.set(Constants.FAULT_TOLERANCE_CLASS,
+ AsyncRcvdMsgCheckpointImpl.class.getName());
+ config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+ int port = BSPNetUtils.getFreePort(12502);
+ LOG.info("Got port = " + port);
+
+ config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+ config.setInt(Constants.PEER_PORT, port);
+
+ config.set("bsp.output.dir", "/tmp/hama-test_out");
+ config.set("bsp.local.dir", "/tmp/hama-test");
+
+ FileSystem dfs = FileSystem.get(config);
+ BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+ TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
+
+ TestMessageManager messenger = new TestMessageManager();
+ PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
+ .getPeerSyncClient(config);
+ BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
+ (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
+
+ assertNotNull("BSPPeerImpl should not be null.", bspTask);
+
+ LOG.info("Created bsp peer and other parameters");
+
+ @SuppressWarnings("unused")
+ FaultTolerantPeerService<Text> service = null;
+
+ bspTask.sync();
+ LOG.info("Completed first sync.");
+
+ checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
+
+ Text txtMessage = new Text("data");
+ messenger.addMessage(txtMessage);
+
+ bspTask.sync();
+
+ LOG.info("Completed second sync.");
+
+ checkSuperstepMsgCount(syncClient, bspTask, job, 2L, 1L);
+
+ // Checking the messages for superstep 2 and peer id 1
+ String expectedPath = "checkpoint/job_checkpttest_0001/2/1";
+ FSDataInputStream in = dfs.open(new Path(expectedPath));
+
+ String className = in.readUTF();
+ Text message = (Text) ReflectionUtils.newInstance(Class.forName(className),
+ config);
+ message.readFields(in);
+
+ assertEquals("data", message.toString());
+
+ dfs.delete(new Path("checkpoint"), true);
+ }
+
+ public void testPeerRecovery() throws Exception {
+ Configuration config = new Configuration();
+ config.set(SyncServiceFactory.SYNC_PEER_CLASS,
+ TempSyncClient.class.getName());
+ config.set(Constants.FAULT_TOLERANCE_CLASS,
+ AsyncRcvdMsgCheckpointImpl.class.getName());
+ config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+ int port = BSPNetUtils.getFreePort(12502);
+ LOG.info("Got port = " + port);
+
+ config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+ config.setInt(Constants.PEER_PORT, port);
+
+ config.set("bsp.output.dir", "/tmp/hama-test_out");
+ config.set("bsp.local.dir", "/tmp/hama-test");
+
+ FileSystem dfs = FileSystem.get(config);
+ BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+ TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
+
+ TestMessageManager messenger = new TestMessageManager();
+ PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
+ .getPeerSyncClient(config);
+
+ Text txtMessage = new Text("data");
+ String writeKey = "job_checkpttest_0001/checkpoint/1/";
+
+ Writable[] writableArr = new Writable[2];
+ writableArr[0] = new LongWritable(3L);
+ writableArr[1] = new LongWritable(5L);
+ ArrayWritable arrWritable = new ArrayWritable(LongWritable.class);
+ arrWritable.set(writableArr);
+ syncClient.storeInformation(writeKey, arrWritable, true, null);
+
+ String writePath = "checkpoint/job_checkpttest_0001/3/1";
+ FSDataOutputStream out = dfs.create(new Path(writePath));
+ for (int i = 0; i < 5; ++i) {
+ out.writeUTF(txtMessage.getClass().getCanonicalName());
+ txtMessage.write(out);
+ }
+ out.close();
+
+ @SuppressWarnings("unused")
+ BSPPeer<?, ?, ?, ?, Text> bspTask = new TestBSPPeer(job, config, taskId,
+ new Counters(), 3L, (BSPPeerSyncClient) syncClient, messenger,
+ TaskStatus.State.RECOVERING);
+
+ BSPMessageBundle<Text> bundleRead = messenger.getLoopbackBundle();
+ assertEquals(5, bundleRead.getMessages().size());
+ String recoveredMsg = bundleRead.getMessages().get(0).toString();
+ assertEquals(recoveredMsg, "data");
+ dfs.delete(new Path("checkpoint"), true);
+ }
+
}
diff --git a/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java
new file mode 100644
index 0000000..4a20dba
--- /dev/null
+++ b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.BestEffortDataLocalTaskAllocator;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+public class TestTaskAllocation extends TestCase {
+
+ public static final Log LOG = LogFactory.getLog(TestTaskAllocation.class);
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ public void testBestEffortDataLocality() throws Exception {
+
+ Configuration conf = new Configuration();
+
+ String[] locations = new String[] { "host6", "host4", "host3" };
+ String value = "data";
+ RawSplit split = new RawSplit();
+ split.setLocations(locations);
+ split.setBytes(value.getBytes(), 0, value.getBytes().length);
+ split.setDataLength(value.getBytes().length);
+
+ assertEquals(value.getBytes().length, (int) split.getDataLength());
+
+ Map<GroomServerStatus, Integer> taskCountInGroomMap = new HashMap<GroomServerStatus, Integer>(
+ 20);
+ BSPResource[] resources = new BSPResource[0];
+ BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+ JobInProgress jobProgress = new JobInProgress(job.getJobID(), conf);
+ TaskInProgress taskInProgress = new TaskInProgress(job.getJobID(),
+ "job.xml", split, conf, jobProgress, 1);
+
+ Map<String, GroomServerStatus> groomStatuses = new HashMap<String, GroomServerStatus>(
+ 20);
+
+ for (int i = 0; i < 10; ++i) {
+
+ String name = "host" + i;
+ GroomServerStatus status = new GroomServerStatus(name,
+ new ArrayList<TaskStatus>(), 0, 3);
+ groomStatuses.put(name, status);
+ taskCountInGroomMap.put(status, 0);
+
+ }
+
+ TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf
+ .getClass("", BestEffortDataLocalTaskAllocator.class,
+ TaskAllocationStrategy.class), conf);
+
+ String[] hosts = strategy.selectGrooms(groomStatuses, taskCountInGroomMap,
+ resources, taskInProgress);
+
+ List<String> list = new ArrayList<String>();
+
+ for (int i = 0; i < hosts.length; ++i) {
+ list.add(hosts[i]);
+ }
+
+ assertTrue(list.contains("host6"));
+ assertTrue(list.contains("host3"));
+ assertTrue(list.contains("host4"));
+
+ }
+
+}
diff --git a/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java b/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
index 3e3afd5..9b68671 100644
--- a/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
+++ b/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
@@ -20,20 +20,22 @@
package org.apache.hama.bsp;
import java.io.IOException;
-import java.util.ArrayList;
import junit.framework.TestCase;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.SyncServiceFactory;
+import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
+import org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl;
import org.apache.hama.bsp.sync.ZooKeeperSyncServerImpl;
import org.apache.hama.util.BSPNetUtils;
-import org.apache.hama.zookeeper.QuorumPeer;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
+import org.mortbay.log.Log;
public class TestZooKeeper extends TestCase {
@@ -41,6 +43,7 @@
public TestZooKeeper() {
configuration = new HamaConfiguration();
+ System.setProperty("user.dir", "/tmp");
configuration.set("bsp.master.address", "localhost");
assertEquals("Make sure master addr is set to localhost:", "localhost",
configuration.get("bsp.master.address"));
@@ -57,6 +60,7 @@
public void testClearZKNodes() throws IOException, KeeperException,
InterruptedException {
final ZooKeeperSyncServerImpl server = new ZooKeeperSyncServerImpl();
+ boolean done = false;
try {
server.init(configuration);
new Thread(new Runnable() {
@@ -71,55 +75,113 @@
}
}).start();
- int timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT,
- 6000);
- String connectStr = QuorumPeer.getZKQuorumServersString(configuration);
- String bspRoot = "/";
- // Establishing a zk session.
- ZooKeeper zk = new ZooKeeper(connectStr, timeout, null);
+ Thread.sleep(1000);
- // Creating dummy bspRoot if it doesn't already exist.
- Stat s = zk.exists(bspRoot, false);
- if (s == null) {
- zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- }
+ String bspRoot = "/bsp";
- // Creating dummy child nodes at depth 1.
- String node1 = bspRoot + "task1";
- String node2 = bspRoot + "task2";
- zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ ZooKeeperSyncClientImpl peerClient = (ZooKeeperSyncClientImpl) SyncServiceFactory
+ .getPeerSyncClient(configuration);
- // Creating dummy child node at depth 2.
- String node11 = node1 + "superstep1";
- zk.create(node11, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ ZKSyncBSPMasterClient masterClient = (ZKSyncBSPMasterClient) SyncServiceFactory
+ .getMasterSyncClient(configuration);
- ArrayList<String> list = (ArrayList<String>) zk.getChildren(bspRoot,
- false);
- assertEquals(2, list.size());
- System.out.println(list.size());
+ masterClient.init(configuration);
- // clear it
- BSPMaster.clearZKNodes(zk, "/");
+ Thread.sleep(100);
- list = (ArrayList<String>) zk.getChildren(bspRoot, false);
- System.out.println(list.size());
- assertEquals(0, list.size());
+ Log.info("Created master and client sync clients");
- try {
- zk.getData(node11, false, null);
- fail();
- } catch (KeeperException.NoNodeException e) {
- System.out.println("Node has been removed correctly!");
- } finally {
- zk.close();
- }
+ assertTrue(masterClient.hasKey(bspRoot));
+
+ Log.info("BSP root exists");
+
+ BSPJobID jobID = new BSPJobID("test1", 1);
+ masterClient.registerJob(jobID.toString());
+ TaskID taskId1 = new TaskID(jobID, 1);
+ TaskID taskId2 = new TaskID(jobID, 2);
+
+ TaskAttemptID task1 = new TaskAttemptID(taskId1, 1);
+ TaskAttemptID task2 = new TaskAttemptID(taskId2, 1);
+
+ int zkPort = BSPNetUtils.getFreePort(21815);
+ configuration.setInt(Constants.PEER_PORT, zkPort);
+ peerClient.init(configuration, jobID, task1);
+
+ peerClient.registerTask(jobID, "hamanode1", 5000L, task1);
+ peerClient.registerTask(jobID, "hamanode2", 5000L, task2);
+
+ peerClient.storeInformation(
+ peerClient.constructKey(jobID, "info", "level2"), new IntWritable(5),
+ true, null);
+
+ String[] names = peerClient.getAllPeerNames(task1);
+
+ Log.info("Found child count = " + names.length);
+
+ assertEquals(2, names.length);
+
+ Log.info("Passed the child count test");
+
+ masterClient.addKey(masterClient.constructKey(jobID, "peer", "1"),
+ true, null);
+ masterClient.addKey(masterClient.constructKey(jobID, "peer", "2"),
+ true, null);
+
+ String[] peerChild = masterClient.getChildKeySet(
+ masterClient.constructKey(jobID, "peer"), null);
+ Log.info("Found child count = " + peerChild.length);
+
+ assertEquals(2, peerChild.length);
+
+ Log.info(" Peer name " + peerChild[0]);
+ Log.info(" Peer name " + peerChild[1]);
+
+ Log.info("Passed the child key set test");
+
+ masterClient.deregisterJob(jobID.toString());
+ Log.info(masterClient.constructKey(jobID));
+
+ Thread.sleep(200);
+
+ assertEquals(false, masterClient.hasKey(masterClient.constructKey(jobID)));
+
+ Log.info("Passed the key presence test");
+
+ boolean result = masterClient
+ .getInformation(masterClient.constructKey(jobID, "info", "level3"),
+ new IntWritable());
+
+ assertEquals(false, result);
+
+ Writable[] writableArr = new Writable[2];
+ writableArr[0] = new LongWritable(3L);
+ writableArr[1] = new LongWritable(5L);
+ ArrayWritable arrWritable = new ArrayWritable(LongWritable.class);
+ arrWritable.set(writableArr);
+ masterClient.storeInformation(
+ masterClient.constructKey(jobID, "info", "level3"),
+ arrWritable, true, null);
+
+ ArrayWritable valueHolder = new ArrayWritable(LongWritable.class);
+
+ boolean getResult = masterClient.getInformation(
+ masterClient.constructKey(jobID, "info", "level3"), valueHolder);
+
+ assertTrue(getResult);
+
+ assertEquals(arrWritable.get()[0], valueHolder.get()[0]);
+ assertEquals(arrWritable.get()[1], valueHolder.get()[1]);
+
+ Log.info("Passed array writable test");
+ done = true;
+
} catch (Exception e) {
e.printStackTrace();
+
} finally {
server.stopServer();
}
+ assertEquals(true, done);
}
}
diff --git a/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java b/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
index b6ce89e..22ed266 100644
--- a/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
+++ b/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
@@ -17,20 +17,64 @@
*/
package org.apache.hama.bsp.sync;
+import java.util.concurrent.Executors;
+
import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskID;
+import org.apache.hama.util.BSPNetUtils;
public class TestSyncServiceFactory extends TestCase {
+ public static final Log LOG = LogFactory.getLog(TestCase.class);
+
+ public static class ListenerTest extends ZKSyncEventListener {
+
+ private Text value;
+
+ public ListenerTest() {
+ value = new Text("init");
+ }
+
+ public String getValue() {
+ return value.toString();
+ }
+
+ @Override
+ public void onDelete() {
+
+ }
+
+ @Override
+ public void onChange() {
+ LOG.info("ZK value changed event triggered.");
+ value.set("Changed");
+
+ }
+
+ @Override
+ public void onChildKeySetChange() {
+
+ }
+
+ }
+
public void testClientInstantiation() throws Exception {
Configuration conf = new Configuration();
// given null, should return zookeeper
- SyncClient syncClient = SyncServiceFactory.getSyncClient(conf);
+ PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(conf);
assertTrue(syncClient instanceof ZooKeeperSyncClientImpl);
}
-
+
public void testServerInstantiation() throws Exception {
Configuration conf = new Configuration();
@@ -39,4 +83,104 @@
assertTrue(syncServer instanceof ZooKeeperSyncServerImpl);
}
+ private static class ZKServerThread implements Runnable {
+
+ SyncServer server;
+
+ ZKServerThread(SyncServer s) {
+ server = s;
+ }
+
+ @Override
+ public void run() {
+ try {
+ server.start();
+ } catch (Exception e) {
+ LOG.error("Error running server.", e);
+ }
+ }
+
+ }
+
+ public void testZKSyncStore() throws Exception {
+ Configuration conf = new Configuration();
+ int zkPort = BSPNetUtils.getFreePort(21811);
+ conf.set("bsp.local.dir", "/tmp/hama-test");
+ conf.set("bsp.output.dir", "/tmp/hama-test_out");
+ conf.setInt(Constants.PEER_PORT, zkPort);
+ conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+ conf.setInt(Constants.ZOOKEEPER_CLIENT_PORT, zkPort);
+ conf.set(Constants.ZOOKEEPER_SESSION_TIMEOUT, "12000");
+ System.setProperty("user.dir", "/tmp");
+ // given null, should return zookeeper
+ final SyncServer syncServer = SyncServiceFactory.getSyncServer(conf);
+ syncServer.init(conf);
+ assertTrue(syncServer instanceof ZooKeeperSyncServerImpl);
+
+ ZKServerThread serverThread = new ZKServerThread(syncServer);
+ Executors.newFixedThreadPool(1).submit(serverThread);
+
+ Thread.sleep(1000);
+
+ final PeerSyncClient syncClient = (PeerSyncClient) SyncServiceFactory
+ .getPeerSyncClient(conf);
+ assertTrue(syncClient instanceof ZooKeeperSyncClientImpl);
+ BSPJobID jobId = new BSPJobID("abc", 1);
+ TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId, 1), 1);
+ syncClient.init(conf, jobId, taskId);
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ syncServer.stopServer();
+
+ } catch (Exception e) {
+ // too late to log!
+ }
+ }
+ });
+
+ IntWritable data = new IntWritable(5);
+ syncClient.storeInformation(
+ syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, true,
+ null);
+
+ ListenerTest listenerTest = new ListenerTest();
+
+ syncClient.registerListener(
+ syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+ ZKSyncEventFactory.getValueChangeEvent(), listenerTest);
+
+ IntWritable valueHolder = new IntWritable();
+ boolean result = syncClient
+ .getInformation(
+ syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+ valueHolder);
+ assertTrue(result);
+ int intVal = valueHolder.get();
+ assertTrue(intVal == data.get());
+
+ data.set(6);
+ syncClient.storeInformation(
+ syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, true,
+ null);
+ valueHolder = new IntWritable();
+ result = syncClient
+ .getInformation(
+ syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+ valueHolder);
+
+ assertTrue(result);
+ intVal = valueHolder.get();
+ assertTrue(intVal == data.get());
+
+ Thread.sleep(5000);
+
+ assertEquals(true, listenerTest.getValue().equals("Changed"));
+
+ syncServer.stopServer();
+
+ }
+
}