| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hama.bsp; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| |
| import junit.framework.TestCase; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.ArrayWritable; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.NullWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hama.Constants; |
| import org.apache.hama.bsp.Counters.Counter; |
| import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl; |
| import org.apache.hama.bsp.ft.FaultTolerantPeerService; |
| import org.apache.hama.bsp.message.MessageEventListener; |
| import org.apache.hama.bsp.message.MessageManager; |
| import org.apache.hama.bsp.message.queue.MessageQueue; |
| import org.apache.hama.bsp.sync.BSPPeerSyncClient; |
| import org.apache.hama.bsp.sync.PeerSyncClient; |
| import org.apache.hama.bsp.sync.SyncEvent; |
| import org.apache.hama.bsp.sync.SyncEventListener; |
| import org.apache.hama.bsp.sync.SyncException; |
| import org.apache.hama.bsp.sync.SyncServiceFactory; |
| import org.apache.hama.util.BSPNetUtils; |
| import org.apache.hama.util.KeyValuePair; |
| |
| public class TestCheckpoint extends TestCase { |
| |
| public static final Log LOG = LogFactory.getLog(TestCheckpoint.class); |
| |
| static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/"; |
| |
| public static class TestMessageManager implements MessageManager<Text> { |
| |
| List<Text> messageQueue = new ArrayList<Text>(); |
| BSPMessageBundle<Text> loopbackBundle = new BSPMessageBundle<Text>(); |
| Iterator<Text> iter = null; |
| MessageEventListener<Text> listener; |
| |
| @Override |
| public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, Text> peer, |
| Configuration conf, InetSocketAddress peerAddress) { |
| // TODO Auto-generated method stub |
| |
| } |
| |
| @Override |
| public void close() { |
| // TODO Auto-generated method stub |
| |
| } |
| |
| @Override |
| public Text getCurrentMessage() throws IOException { |
| if (iter == null) |
| iter = this.messageQueue.iterator(); |
| if (iter.hasNext()) |
| return iter.next(); |
| return null; |
| } |
| |
| @Override |
| public void send(String peerName, Text msg) throws IOException { |
| } |
| |
| @Override |
| public void finishSendPhase() throws IOException { |
| } |
| |
| @Override |
| public Iterator<Entry<InetSocketAddress, MessageQueue<Text>>> getMessageIterator() { |
| return null; |
| } |
| |
| @Override |
| public void transfer(InetSocketAddress addr, BSPMessageBundle<Text> bundle) |
| throws IOException { |
| // TODO Auto-generated method stub |
| |
| } |
| |
| @Override |
| public void clearOutgoingQueues() { |
| } |
| |
| @Override |
| public int getNumCurrentMessages() { |
| return this.messageQueue.size(); |
| } |
| |
| public BSPMessageBundle<Text> getLoopbackBundle() { |
| return this.loopbackBundle; |
| } |
| |
| public void addMessage(Text message) throws IOException { |
| this.messageQueue.add(message); |
| listener.onMessageReceived(message); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) { |
| this.loopbackBundle = (BSPMessageBundle<Text>) bundle; |
| } |
| |
| @Override |
| public void loopBackMessage(Writable message) { |
| } |
| |
| @Override |
| public void registerListener(MessageEventListener<Text> listener) |
| throws IOException { |
| this.listener = listener; |
| } |
| |
| @Override |
| public InetSocketAddress getListenerAddress() { |
| // TODO Auto-generated method stub |
| return null; |
| } |
| |
| } |
| |
| public static class TestBSPPeer implements |
| BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, Text> { |
| |
| Configuration conf; |
| long superstepCount; |
| FaultTolerantPeerService<Text> fService; |
| |
| public TestBSPPeer(BSPJob job, Configuration conf, TaskAttemptID taskId, |
| Counters counters, long superstep, BSPPeerSyncClient syncClient, |
| MessageManager<Text> messenger, TaskStatus.State state) { |
| this.conf = conf; |
| if (superstep > 0) |
| superstepCount = superstep; |
| else |
| superstepCount = 0L; |
| |
| try { |
| fService = (new AsyncRcvdMsgCheckpointImpl<Text>()) |
| .constructPeerFaultTolerance(job, this, syncClient, null, taskId, |
| superstep, conf, messenger); |
| this.fService.onPeerInitialized(state); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| @Override |
| public void send(String peerName, Text msg) throws IOException { |
| } |
| |
| @Override |
| public Text getCurrentMessage() throws IOException { |
| return new Text("data"); |
| } |
| |
| @Override |
| public int getNumCurrentMessages() { |
| return 1; |
| } |
| |
| @Override |
| public void sync() throws IOException, SyncException, InterruptedException { |
| ++superstepCount; |
| try { |
| this.fService.afterBarrier(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| LOG.info("After barrier " + superstepCount); |
| } |
| |
| @Override |
| public long getSuperstepCount() { |
| return superstepCount; |
| } |
| |
| @Override |
| public String getPeerName() { |
| return null; |
| } |
| |
| @Override |
| public String getPeerName(int index) { |
| return null; |
| } |
| |
| @Override |
| public int getPeerIndex() { |
| return 1; |
| } |
| |
| @Override |
| public String[] getAllPeerNames() { |
| return null; |
| } |
| |
| @Override |
| public int getNumPeers() { |
| return 0; |
| } |
| |
| @Override |
| public void clear() { |
| |
| } |
| |
| @Override |
| public void write(NullWritable key, NullWritable value) throws IOException { |
| |
| } |
| |
| @Override |
| public boolean readNext(NullWritable key, NullWritable value) |
| throws IOException { |
| return false; |
| } |
| |
| @Override |
| public KeyValuePair<NullWritable, NullWritable> readNext() |
| throws IOException { |
| return null; |
| } |
| |
| @Override |
| public void reopenInput() throws IOException { |
| |
| } |
| |
| @Override |
| public Configuration getConfiguration() { |
| return null; |
| } |
| |
| @Override |
| public Counter getCounter(Enum<?> name) { |
| return null; |
| } |
| |
| @Override |
| public Counter getCounter(String group, String name) { |
| return null; |
| } |
| |
| @Override |
| public void incrementCounter(Enum<?> key, long amount) { |
| |
| } |
| |
| @Override |
| public void incrementCounter(String group, String counter, long amount) { |
| |
| } |
| |
| @Override |
| public long getSplitSize() { |
| return 0; |
| } |
| |
| @Override |
| public long getPos() throws IOException { |
| return 0; |
| } |
| |
| @Override |
| public TaskAttemptID getTaskId() { |
| return null; |
| } |
| |
| } |
| |
| public static class TempSyncClient extends BSPPeerSyncClient { |
| |
| Map<String, Writable> valueMap = new HashMap<String, Writable>(); |
| |
| @Override |
| public String constructKey(BSPJobID jobId, String... args) { |
| StringBuffer buffer = new StringBuffer(100); |
| buffer.append(jobId.toString()).append("/"); |
| for (String arg : args) { |
| buffer.append(arg).append("/"); |
| } |
| return buffer.toString(); |
| } |
| |
| @Override |
| public boolean storeInformation(String key, Writable value, |
| boolean permanent, SyncEventListener listener) { |
| ArrayWritable writables = (ArrayWritable) value; |
| long step = ((LongWritable) writables.get()[0]).get(); |
| long count = ((LongWritable) writables.get()[1]).get(); |
| |
| LOG.info("SyncClient Storing value step = " + step + " count = " + count |
| + " for key " + key); |
| valueMap.put(key, value); |
| return true; |
| } |
| |
| @Override |
| public boolean getInformation(String key, Writable valueHolder) { |
| LOG.info("Getting value for key " + key); |
| if (!valueMap.containsKey(key)) { |
| return false; |
| } |
| Writable value = valueMap.get(key); |
| ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); |
| DataOutputStream outputStream = new DataOutputStream(byteStream); |
| byte[] data = null; |
| try { |
| value.write(outputStream); |
| outputStream.flush(); |
| data = byteStream.toByteArray(); |
| ByteArrayInputStream istream = new ByteArrayInputStream(data); |
| DataInputStream diStream = new DataInputStream(istream); |
| valueHolder.readFields(diStream); |
| return true; |
| } catch (IOException e) { |
| LOG.error("Error writing data to write buffer.", e); |
| } finally { |
| try { |
| byteStream.close(); |
| outputStream.close(); |
| } catch (IOException e) { |
| LOG.error("Error closing byte stream.", e); |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean addKey(String key, boolean permanent, |
| SyncEventListener listener) { |
| valueMap.put(key, NullWritable.get()); |
| return true; |
| } |
| |
| @Override |
| public boolean hasKey(String key) { |
| return valueMap.containsKey(key); |
| } |
| |
| @Override |
| public String[] getChildKeySet(String key, SyncEventListener listener) { |
| List<String> list = new ArrayList<String>(); |
| Iterator<String> keyIter = valueMap.keySet().iterator(); |
| while (keyIter.hasNext()) { |
| String keyVal = keyIter.next(); |
| if (keyVal.startsWith(key + "/")) { |
| list.add(keyVal); |
| } |
| } |
| String[] arr = new String[list.size()]; |
| list.toArray(arr); |
| return arr; |
| } |
| |
| @Override |
| public boolean registerListener(String key, SyncEvent event, |
| SyncEventListener listener) { |
| return false; |
| } |
| |
| @Override |
| public boolean remove(String key, SyncEventListener listener) { |
| valueMap.remove(key); |
| return false; |
| } |
| |
| @Override |
| public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId) |
| throws Exception { |
| } |
| |
| @Override |
| public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, |
| long superstep) throws SyncException { |
| LOG.info("Enter barrier called - " + superstep); |
| } |
| |
| @Override |
| public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, |
| long superstep) throws SyncException { |
| LOG.info("Exit barrier called - " + superstep); |
| } |
| |
| @Override |
| public void register(BSPJobID jobId, TaskAttemptID taskId, |
| String hostAddress, long port) { |
| } |
| |
| @Override |
| public String[] getAllPeerNames(TaskAttemptID taskId) { |
| return null; |
| } |
| |
| @Override |
| public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId, |
| String hostAddress, long port) { |
| } |
| |
| @Override |
| public void stopServer() { |
| } |
| |
| @Override |
| public void close() throws IOException { |
| } |
| |
| } |
| |
| private static void checkSuperstepMsgCount(PeerSyncClient syncClient, |
| @SuppressWarnings("rawtypes") BSPPeer bspTask, BSPJob job, long step, |
| long count) { |
| |
| ArrayWritable writableVal = new ArrayWritable(LongWritable.class); |
| |
| boolean result = syncClient.getInformation( |
| syncClient.constructKey(job.getJobID(), "checkpoint", |
| "" + bspTask.getPeerIndex()), writableVal); |
| |
| assertTrue(result); |
| |
| LongWritable superstepNo = (LongWritable) writableVal.get()[0]; |
| LongWritable msgCount = (LongWritable) writableVal.get()[1]; |
| |
| assertEquals(step, superstepNo.get()); |
| assertEquals(count, msgCount.get()); |
| } |
| |
| public void testCheckpointInterval() throws Exception { |
| Configuration config = new Configuration(); |
| System.setProperty("user.dir", "/tmp"); |
| config.set(SyncServiceFactory.SYNC_PEER_CLASS, |
| TempSyncClient.class.getName()); |
| config.set(Constants.FAULT_TOLERANCE_CLASS, |
| AsyncRcvdMsgCheckpointImpl.class.getName()); |
| config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true); |
| config.setBoolean(Constants.CHECKPOINT_ENABLED, true); |
| config.setInt(Constants.CHECKPOINT_INTERVAL, 2); |
| config.set("bsp.output.dir", "/tmp/hama-test_out"); |
| config.set("bsp.local.dir", "/tmp/hama-test"); |
| |
| FileSystem dfs = FileSystem.get(config); |
| BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp"); |
| TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1); |
| |
| TestMessageManager messenger = new TestMessageManager(); |
| PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(config); |
| @SuppressWarnings("rawtypes") |
| BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L, |
| (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING); |
| |
| assertNotNull("BSPPeerImpl should not be null.", bspTask); |
| |
| LOG.info("Created bsp peer and other parameters"); |
| int port = BSPNetUtils.getFreePort(12502); |
| LOG.info("Got port = " + port); |
| |
| boolean result = syncClient |
| .getInformation(syncClient.constructKey(job.getJobID(), "checkpoint", |
| "" + bspTask.getPeerIndex()), new ArrayWritable(LongWritable.class)); |
| |
| assertFalse(result); |
| |
| bspTask.sync(); |
| // Superstep 1 |
| |
| checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L); |
| |
| Text txtMessage = new Text("data"); |
| messenger.addMessage(txtMessage); |
| |
| bspTask.sync(); |
| // Superstep 2 |
| |
| checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L); |
| |
| messenger.addMessage(txtMessage); |
| |
| bspTask.sync(); |
| // Superstep 3 |
| |
| checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L); |
| |
| bspTask.sync(); |
| // Superstep 4 |
| |
| checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L); |
| |
| messenger.addMessage(txtMessage); |
| messenger.addMessage(txtMessage); |
| |
| bspTask.sync(); |
| // Superstep 5 |
| |
| checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L); |
| |
| bspTask.sync(); |
| // Superstep 6 |
| |
| checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L); |
| |
| dfs.delete(new Path("checkpoint"), true); |
| } |
| |
| @SuppressWarnings("rawtypes") |
| public void testCheckpoint() throws Exception { |
| Configuration config = new Configuration(); |
| config.set(SyncServiceFactory.SYNC_PEER_CLASS, |
| TempSyncClient.class.getName()); |
| config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true); |
| config.set(Constants.FAULT_TOLERANCE_CLASS, |
| AsyncRcvdMsgCheckpointImpl.class.getName()); |
| config.setBoolean(Constants.CHECKPOINT_ENABLED, true); |
| int port = BSPNetUtils.getFreePort(12502); |
| LOG.info("Got port = " + port); |
| |
| config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST); |
| config.setInt(Constants.PEER_PORT, port); |
| |
| config.set("bsp.output.dir", "/tmp/hama-test_out"); |
| config.set("bsp.local.dir", "/tmp/hama-test"); |
| |
| FileSystem dfs = FileSystem.get(config); |
| BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp"); |
| TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1); |
| |
| TestMessageManager messenger = new TestMessageManager(); |
| PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(config); |
| BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L, |
| (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING); |
| |
| assertNotNull("BSPPeerImpl should not be null.", bspTask); |
| |
| LOG.info("Created bsp peer and other parameters"); |
| |
| @SuppressWarnings("unused") |
| FaultTolerantPeerService<Text> service = null; |
| |
| bspTask.sync(); |
| LOG.info("Completed first sync."); |
| |
| checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L); |
| |
| Text txtMessage = new Text("data"); |
| messenger.addMessage(txtMessage); |
| |
| bspTask.sync(); |
| |
| LOG.info("Completed second sync."); |
| |
| checkSuperstepMsgCount(syncClient, bspTask, job, 2L, 1L); |
| |
| // Checking the messages for superstep 2 and peer id 1 |
| String expectedPath = "checkpoint/job_checkpttest_0001/2/1"; |
| FSDataInputStream in = dfs.open(new Path(expectedPath)); |
| |
| String className = in.readUTF(); |
| Text message = (Text) ReflectionUtils.newInstance(Class.forName(className), |
| config); |
| message.readFields(in); |
| |
| assertEquals("data", message.toString()); |
| |
| dfs.delete(new Path("checkpoint"), true); |
| } |
| |
| public void testPeerRecovery() throws Exception { |
| Configuration config = new Configuration(); |
| config.set(SyncServiceFactory.SYNC_PEER_CLASS, |
| TempSyncClient.class.getName()); |
| config.set(Constants.FAULT_TOLERANCE_CLASS, |
| AsyncRcvdMsgCheckpointImpl.class.getName()); |
| config.setBoolean(Constants.CHECKPOINT_ENABLED, true); |
| int port = BSPNetUtils.getFreePort(12502); |
| LOG.info("Got port = " + port); |
| |
| config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST); |
| config.setInt(Constants.PEER_PORT, port); |
| |
| config.set("bsp.output.dir", "/tmp/hama-test_out"); |
| config.set("bsp.local.dir", "/tmp/hama-test"); |
| |
| FileSystem dfs = FileSystem.get(config); |
| BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp"); |
| TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1); |
| |
| TestMessageManager messenger = new TestMessageManager(); |
| PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(config); |
| |
| Text txtMessage = new Text("data"); |
| String writeKey = "job_checkpttest_0001/checkpoint/1/"; |
| |
| Writable[] writableArr = new Writable[2]; |
| writableArr[0] = new LongWritable(3L); |
| writableArr[1] = new LongWritable(5L); |
| ArrayWritable arrWritable = new ArrayWritable(LongWritable.class); |
| arrWritable.set(writableArr); |
| syncClient.storeInformation(writeKey, arrWritable, true, null); |
| |
| String writePath = "checkpoint/job_checkpttest_0001/3/1"; |
| FSDataOutputStream out = dfs.create(new Path(writePath)); |
| for (int i = 0; i < 5; ++i) { |
| out.writeUTF(txtMessage.getClass().getCanonicalName()); |
| txtMessage.write(out); |
| } |
| out.close(); |
| |
| @SuppressWarnings("unused") |
| BSPPeer<?, ?, ?, ?, Text> bspTask = new TestBSPPeer(job, config, taskId, |
| new Counters(), 3L, (BSPPeerSyncClient) syncClient, messenger, |
| TaskStatus.State.RECOVERING); |
| |
| BSPMessageBundle<Text> bundleRead = messenger.getLoopbackBundle(); |
| assertEquals(5, bundleRead.getMessages().size()); |
| String recoveredMsg = bundleRead.getMessages().get(0).toString(); |
| assertEquals(recoveredMsg, "data"); |
| dfs.delete(new Path("checkpoint"), true); |
| } |
| |
| } |