blob: 085114db92c76c775a8892c88f4a71fbe8bc02b7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.Counters.Counter;
import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
import org.apache.hama.bsp.ft.FaultTolerantPeerService;
import org.apache.hama.bsp.message.MessageEventListener;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.sync.BSPPeerSyncClient;
import org.apache.hama.bsp.sync.PeerSyncClient;
import org.apache.hama.bsp.sync.SyncEvent;
import org.apache.hama.bsp.sync.SyncEventListener;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.util.BSPNetUtils;
import org.apache.hama.util.KeyValuePair;
public class TestCheckpoint extends TestCase {
public static final Log LOG = LogFactory.getLog(TestCheckpoint.class);
static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/";
public static class TestMessageManager implements MessageManager<Text> {
List<Text> messageQueue = new ArrayList<Text>();
BSPMessageBundle<Text> loopbackBundle = new BSPMessageBundle<Text>();
Iterator<Text> iter = null;
MessageEventListener<Text> listener;
@Override
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, Text> peer,
Configuration conf, InetSocketAddress peerAddress) {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public Text getCurrentMessage() throws IOException {
if (iter == null)
iter = this.messageQueue.iterator();
if (iter.hasNext())
return iter.next();
return null;
}
@Override
public void send(String peerName, Text msg) throws IOException {
}
@Override
public void finishSendPhase() throws IOException {
}
@Override
public Iterator<Entry<InetSocketAddress, MessageQueue<Text>>> getMessageIterator() {
return null;
}
@Override
public void transfer(InetSocketAddress addr, BSPMessageBundle<Text> bundle)
throws IOException {
// TODO Auto-generated method stub
}
@Override
public void clearOutgoingQueues() {
}
@Override
public int getNumCurrentMessages() {
return this.messageQueue.size();
}
public BSPMessageBundle<Text> getLoopbackBundle() {
return this.loopbackBundle;
}
public void addMessage(Text message) throws IOException {
this.messageQueue.add(message);
listener.onMessageReceived(message);
}
@SuppressWarnings("unchecked")
@Override
public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) {
this.loopbackBundle = (BSPMessageBundle<Text>) bundle;
}
@Override
public void loopBackMessage(Writable message) {
}
@Override
public void registerListener(MessageEventListener<Text> listener)
throws IOException {
this.listener = listener;
}
@Override
public InetSocketAddress getListenerAddress() {
// TODO Auto-generated method stub
return null;
}
}
public static class TestBSPPeer implements
BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, Text> {
Configuration conf;
long superstepCount;
FaultTolerantPeerService<Text> fService;
public TestBSPPeer(BSPJob job, Configuration conf, TaskAttemptID taskId,
Counters counters, long superstep, BSPPeerSyncClient syncClient,
MessageManager<Text> messenger, TaskStatus.State state) {
this.conf = conf;
if (superstep > 0)
superstepCount = superstep;
else
superstepCount = 0L;
try {
fService = (new AsyncRcvdMsgCheckpointImpl<Text>())
.constructPeerFaultTolerance(job, this, syncClient, null, taskId,
superstep, conf, messenger);
this.fService.onPeerInitialized(state);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void send(String peerName, Text msg) throws IOException {
}
@Override
public Text getCurrentMessage() throws IOException {
return new Text("data");
}
@Override
public int getNumCurrentMessages() {
return 1;
}
@Override
public void sync() throws IOException, SyncException, InterruptedException {
++superstepCount;
try {
this.fService.afterBarrier();
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("After barrier " + superstepCount);
}
@Override
public long getSuperstepCount() {
return superstepCount;
}
@Override
public String getPeerName() {
return null;
}
@Override
public String getPeerName(int index) {
return null;
}
@Override
public int getPeerIndex() {
return 1;
}
@Override
public String[] getAllPeerNames() {
return null;
}
@Override
public int getNumPeers() {
return 0;
}
@Override
public void clear() {
}
@Override
public void write(NullWritable key, NullWritable value) throws IOException {
}
@Override
public boolean readNext(NullWritable key, NullWritable value)
throws IOException {
return false;
}
@Override
public KeyValuePair<NullWritable, NullWritable> readNext()
throws IOException {
return null;
}
@Override
public void reopenInput() throws IOException {
}
@Override
public Configuration getConfiguration() {
return null;
}
@Override
public Counter getCounter(Enum<?> name) {
return null;
}
@Override
public Counter getCounter(String group, String name) {
return null;
}
@Override
public void incrementCounter(Enum<?> key, long amount) {
}
@Override
public void incrementCounter(String group, String counter, long amount) {
}
@Override
public long getSplitSize() {
return 0;
}
@Override
public long getPos() throws IOException {
return 0;
}
@Override
public TaskAttemptID getTaskId() {
return null;
}
}
public static class TempSyncClient extends BSPPeerSyncClient {
Map<String, Writable> valueMap = new HashMap<String, Writable>();
@Override
public String constructKey(BSPJobID jobId, String... args) {
StringBuffer buffer = new StringBuffer(100);
buffer.append(jobId.toString()).append("/");
for (String arg : args) {
buffer.append(arg).append("/");
}
return buffer.toString();
}
@Override
public boolean storeInformation(String key, Writable value,
boolean permanent, SyncEventListener listener) {
ArrayWritable writables = (ArrayWritable) value;
long step = ((LongWritable) writables.get()[0]).get();
long count = ((LongWritable) writables.get()[1]).get();
LOG.info("SyncClient Storing value step = " + step + " count = " + count
+ " for key " + key);
valueMap.put(key, value);
return true;
}
@Override
public boolean getInformation(String key, Writable valueHolder) {
LOG.info("Getting value for key " + key);
if (!valueMap.containsKey(key)) {
return false;
}
Writable value = valueMap.get(key);
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
DataOutputStream outputStream = new DataOutputStream(byteStream);
byte[] data = null;
try {
value.write(outputStream);
outputStream.flush();
data = byteStream.toByteArray();
ByteArrayInputStream istream = new ByteArrayInputStream(data);
DataInputStream diStream = new DataInputStream(istream);
valueHolder.readFields(diStream);
return true;
} catch (IOException e) {
LOG.error("Error writing data to write buffer.", e);
} finally {
try {
byteStream.close();
outputStream.close();
} catch (IOException e) {
LOG.error("Error closing byte stream.", e);
}
}
return false;
}
@Override
public boolean addKey(String key, boolean permanent,
SyncEventListener listener) {
valueMap.put(key, NullWritable.get());
return true;
}
@Override
public boolean hasKey(String key) {
return valueMap.containsKey(key);
}
@Override
public String[] getChildKeySet(String key, SyncEventListener listener) {
List<String> list = new ArrayList<String>();
Iterator<String> keyIter = valueMap.keySet().iterator();
while (keyIter.hasNext()) {
String keyVal = keyIter.next();
if (keyVal.startsWith(key + "/")) {
list.add(keyVal);
}
}
String[] arr = new String[list.size()];
list.toArray(arr);
return arr;
}
@Override
public boolean registerListener(String key, SyncEvent event,
SyncEventListener listener) {
return false;
}
@Override
public boolean remove(String key, SyncEventListener listener) {
valueMap.remove(key);
return false;
}
@Override
public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
throws Exception {
}
@Override
public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId,
long superstep) throws SyncException {
LOG.info("Enter barrier called - " + superstep);
}
@Override
public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId,
long superstep) throws SyncException {
LOG.info("Exit barrier called - " + superstep);
}
@Override
public void register(BSPJobID jobId, TaskAttemptID taskId,
String hostAddress, long port) {
}
@Override
public String[] getAllPeerNames(TaskAttemptID taskId) {
return null;
}
@Override
public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
String hostAddress, long port) {
}
@Override
public void stopServer() {
}
@Override
public void close() throws IOException {
}
}
private static void checkSuperstepMsgCount(PeerSyncClient syncClient,
@SuppressWarnings("rawtypes") BSPPeer bspTask, BSPJob job, long step,
long count) {
ArrayWritable writableVal = new ArrayWritable(LongWritable.class);
boolean result = syncClient.getInformation(
syncClient.constructKey(job.getJobID(), "checkpoint",
"" + bspTask.getPeerIndex()), writableVal);
assertTrue(result);
LongWritable superstepNo = (LongWritable) writableVal.get()[0];
LongWritable msgCount = (LongWritable) writableVal.get()[1];
assertEquals(step, superstepNo.get());
assertEquals(count, msgCount.get());
}
public void testCheckpointInterval() throws Exception {
Configuration config = new Configuration();
System.setProperty("user.dir", "/tmp");
config.set(SyncServiceFactory.SYNC_PEER_CLASS,
TempSyncClient.class.getName());
config.set(Constants.FAULT_TOLERANCE_CLASS,
AsyncRcvdMsgCheckpointImpl.class.getName());
config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true);
config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
config.setInt(Constants.CHECKPOINT_INTERVAL, 2);
config.set("bsp.output.dir", "/tmp/hama-test_out");
config.set("bsp.local.dir", "/tmp/hama-test");
FileSystem dfs = FileSystem.get(config);
BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
TestMessageManager messenger = new TestMessageManager();
PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(config);
@SuppressWarnings("rawtypes")
BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
(BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
assertNotNull("BSPPeerImpl should not be null.", bspTask);
LOG.info("Created bsp peer and other parameters");
int port = BSPNetUtils.getFreePort(12502);
LOG.info("Got port = " + port);
boolean result = syncClient
.getInformation(syncClient.constructKey(job.getJobID(), "checkpoint",
"" + bspTask.getPeerIndex()), new ArrayWritable(LongWritable.class));
assertFalse(result);
bspTask.sync();
// Superstep 1
checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
Text txtMessage = new Text("data");
messenger.addMessage(txtMessage);
bspTask.sync();
// Superstep 2
checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
messenger.addMessage(txtMessage);
bspTask.sync();
// Superstep 3
checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L);
bspTask.sync();
// Superstep 4
checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L);
messenger.addMessage(txtMessage);
messenger.addMessage(txtMessage);
bspTask.sync();
// Superstep 5
checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L);
bspTask.sync();
// Superstep 6
checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L);
dfs.delete(new Path("checkpoint"), true);
}
@SuppressWarnings("rawtypes")
public void testCheckpoint() throws Exception {
Configuration config = new Configuration();
config.set(SyncServiceFactory.SYNC_PEER_CLASS,
TempSyncClient.class.getName());
config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true);
config.set(Constants.FAULT_TOLERANCE_CLASS,
AsyncRcvdMsgCheckpointImpl.class.getName());
config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
int port = BSPNetUtils.getFreePort(12502);
LOG.info("Got port = " + port);
config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
config.setInt(Constants.PEER_PORT, port);
config.set("bsp.output.dir", "/tmp/hama-test_out");
config.set("bsp.local.dir", "/tmp/hama-test");
FileSystem dfs = FileSystem.get(config);
BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
TestMessageManager messenger = new TestMessageManager();
PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(config);
BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
(BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
assertNotNull("BSPPeerImpl should not be null.", bspTask);
LOG.info("Created bsp peer and other parameters");
@SuppressWarnings("unused")
FaultTolerantPeerService<Text> service = null;
bspTask.sync();
LOG.info("Completed first sync.");
checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
Text txtMessage = new Text("data");
messenger.addMessage(txtMessage);
bspTask.sync();
LOG.info("Completed second sync.");
checkSuperstepMsgCount(syncClient, bspTask, job, 2L, 1L);
// Checking the messages for superstep 2 and peer id 1
String expectedPath = "checkpoint/job_checkpttest_0001/2/1";
FSDataInputStream in = dfs.open(new Path(expectedPath));
String className = in.readUTF();
Text message = (Text) ReflectionUtils.newInstance(Class.forName(className),
config);
message.readFields(in);
assertEquals("data", message.toString());
dfs.delete(new Path("checkpoint"), true);
}
public void testPeerRecovery() throws Exception {
Configuration config = new Configuration();
config.set(SyncServiceFactory.SYNC_PEER_CLASS,
TempSyncClient.class.getName());
config.set(Constants.FAULT_TOLERANCE_CLASS,
AsyncRcvdMsgCheckpointImpl.class.getName());
config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
int port = BSPNetUtils.getFreePort(12502);
LOG.info("Got port = " + port);
config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
config.setInt(Constants.PEER_PORT, port);
config.set("bsp.output.dir", "/tmp/hama-test_out");
config.set("bsp.local.dir", "/tmp/hama-test");
FileSystem dfs = FileSystem.get(config);
BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
TestMessageManager messenger = new TestMessageManager();
PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(config);
Text txtMessage = new Text("data");
String writeKey = "job_checkpttest_0001/checkpoint/1/";
Writable[] writableArr = new Writable[2];
writableArr[0] = new LongWritable(3L);
writableArr[1] = new LongWritable(5L);
ArrayWritable arrWritable = new ArrayWritable(LongWritable.class);
arrWritable.set(writableArr);
syncClient.storeInformation(writeKey, arrWritable, true, null);
String writePath = "checkpoint/job_checkpttest_0001/3/1";
FSDataOutputStream out = dfs.create(new Path(writePath));
for (int i = 0; i < 5; ++i) {
out.writeUTF(txtMessage.getClass().getCanonicalName());
txtMessage.write(out);
}
out.close();
@SuppressWarnings("unused")
BSPPeer<?, ?, ?, ?, Text> bspTask = new TestBSPPeer(job, config, taskId,
new Counters(), 3L, (BSPPeerSyncClient) syncClient, messenger,
TaskStatus.State.RECOVERING);
BSPMessageBundle<Text> bundleRead = messenger.getLoopbackBundle();
assertEquals(5, bundleRead.getMessages().size());
String recoveredMsg = bundleRead.getMessages().get(0).toString();
assertEquals(recoveredMsg, "data");
dfs.delete(new Path("checkpoint"), true);
}
}