blob: 2e79d16a96408ef11a44bb9f4a1b39d1c684502a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hama.ipc.RPC;
import org.apache.hama.ipc.Server;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.HamaTestCase;
import org.apache.hama.bsp.sync.SyncClient;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.util.BSPNetUtils;
public class TestBSPTaskFaults extends TestCase {
public static final Log LOG = LogFactory.getLog(HamaTestCase.class);
public static final String TEST_POINT = "bsp.ft.test.point";
public static final String TEST_GROOM_PORT = "bsp.ft.test.groomport";
private static int TEST_NUMBER = 0;
private volatile MinimalGroomServer groom;
private volatile BSPPeerProtocol umbilical;
private Server workerServer;
private TaskAttemptID taskid = new TaskAttemptID(new TaskID(new BSPJobID(
"job_201110302255", 1), 1), 1);
public volatile HamaConfiguration conf;
private ScheduledExecutorService testBSPTaskService;
private static synchronized int incrementTestNumber() {
return ++TEST_NUMBER;
}
@SuppressWarnings("unused")
public static class MinimalGroomServer implements BSPPeerProtocol {
private volatile int pingCount;
private volatile long firstPingTime;
private volatile long lastPingTime;
private boolean isShutDown = false;
private boolean taskComplete = false;
private boolean errorCondition = false;
private Configuration conf;
public MinimalGroomServer(Configuration config) throws IOException {
conf = config;
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return HamaRPCProtocolVersion.versionID;
}
@Override
public void close() throws IOException {
isShutDown = true;
}
@Override
public Task getTask(TaskAttemptID taskid) throws IOException {
return new BSPTask();
}
@Override
public boolean ping(TaskAttemptID taskid) throws IOException {
LOG.error("Pinged");
++pingCount;
if (pingCount == 1) {
firstPingTime = System.currentTimeMillis();
}
lastPingTime = System.currentTimeMillis();
return true;
}
@Override
public void done(TaskAttemptID taskid) throws IOException {
taskComplete = true;
}
@Override
public void fsError(TaskAttemptID taskId, String message)
throws IOException {
errorCondition = true;
}
@Override
public void fatalError(TaskAttemptID taskId, String message)
throws IOException {
errorCondition = true;
}
@Override
public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
return true;
}
public synchronized int getPingCount() {
return pingCount;
}
public void setPingCount(int pingCount) {
this.pingCount = pingCount;
if (pingCount == 0) {
firstPingTime = 0L;
lastPingTime = 0L;
}
}
}
@SuppressWarnings("unused")
private class TestBSPTaskThreadRunner extends Thread {
BSPJob job;
TestBSPTaskThreadRunner(BSPJob jobConf) {
job = jobConf;
}
@Override
@SuppressWarnings("rawtypes")
public void run() {
BSPTask task = new BSPTask();
task.setConf(job);
try {
BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, conf, taskid,
umbilical, 0, null, null, new Counters());
task.run(job, bspPeer, umbilical); // run the task
} catch (Exception e) {
LOG.error("Error in BSPTask execution.", e);
}
}
}
/*
* Test BSP exiting its own process. Hence we need a minimal process runner.
*/
public static class TestBSPProcessRunner implements Callable<Integer> {
private final ScheduledExecutorService sched;
private final AtomicReference<ScheduledFuture<Integer>> future;
private Process bspTaskProcess;
private Thread errorLog;
private Thread infoLog;
private int testPoint;
private int testPort;
TestBSPProcessRunner(int point, int port) {
sched = Executors.newScheduledThreadPool(1);
future = new AtomicReference<ScheduledFuture<Integer>>();
bspTaskProcess = null;
testPoint = point;
testPort = port;
}
private static void readStream(InputStream input) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
String line;
while ((line = reader.readLine()) != null) {
LOG.info(line);
}
}
public void startBSPProcess() {
this.future.set(this.sched.schedule(this, 0, SECONDS));
LOG.debug("Start building BSPPeer process.");
}
public int getBSPExitCode() {
try {
return this.future.get().get();
} catch (Exception e) {
LOG.error("Error while fetching exit status from BSPTask", e);
} finally {
}
return -1;
}
public void destroyProcess() {
bspTaskProcess.destroy();
sched.shutdown();
}
@Override
public Integer call() throws Exception {
String SYSTEM_PATH_SEPARATOR = System.getProperty("path.separator");
List<String> commands = new ArrayList<String>();
String workDir = new File(".").getAbsolutePath();
File jvm = // use same jvm as parent
new File(new File(System.getProperty("java.home"), "bin"), "java");
commands.add(jvm.toString());
StringBuffer classPath = new StringBuffer();
// start with same classpath as parent process
classPath.append(System.getProperty("java.class.path"));
classPath.append(SYSTEM_PATH_SEPARATOR);
classPath.append(new File(workDir, "core/target/test-classes"));
classPath.append(SYSTEM_PATH_SEPARATOR);
classPath.append(workDir);
commands.add("-classpath");
commands.add(classPath.toString());
commands.add(TestBSPProcessRunner.class.getName());
LOG.info("starting process for failure case - " + testPoint);
commands.add("" + testPoint);
commands.add("" + testPort);
LOG.info(commands.toString());
ProcessBuilder builder = new ProcessBuilder(commands);
try {
bspTaskProcess = builder.start();
// We have errorLog and infoLog to prevent block on pipe between
// child and parent process.
errorLog = new Thread() {
@Override
public void run() {
try {
readStream(bspTaskProcess.getErrorStream());
} catch (Exception e) {
}
}
};
errorLog.start();
infoLog = new Thread() {
@Override
public void run() {
try {
readStream(bspTaskProcess.getInputStream());
} catch (Exception e) {
}
}
};
infoLog.start();
int exit_code = bspTaskProcess.waitFor();
return exit_code;
} catch (Exception e) {
LOG.error("Error getting exit code of child process", e);
}
return -1;
}
public static void main(String[] args) {
HamaConfiguration hamaConf = new HamaConfiguration();
hamaConf.setInt(Constants.GROOM_PING_PERIOD, 200);
hamaConf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
hamaConf.setClass(SyncServiceFactory.SYNC_PEER_CLASS,
LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
hamaConf.setInt("bsp.master.port", 610002);
TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
"job_201110102255", 1), 1), 1);
hamaConf.setInt(TEST_POINT, Integer.parseInt(args[0]));
int port = Integer.parseInt(args[1]);
try {
BSPJob job = new BSPJob(hamaConf);
job.setInputFormat(NullInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID,
new InetSocketAddress("127.0.0.1", port), hamaConf);
BSPTask task = new BSPTask();
task.setConf(job);
LOG.info("Testing failure case in process - "
+ hamaConf.getInt(TEST_POINT, 0));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
proto.close();
} catch (Exception e) {
// too late to log!
}
}
});
@SuppressWarnings("rawtypes")
BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, hamaConf,
tid, proto, 0, null, null, new Counters());
task.run(job, bspPeer, proto); // run the task
} catch (Exception e) {
LOG.error("Error in bsp child process.", e);
}
}
}
/*
* Test BSP class that has faults injected in each phase.
*/
private static class FaulTestBSP extends
BSP<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> {
@Override
public void setup(
BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> peer)
throws IOException, SyncException, InterruptedException {
if (peer.getConfiguration().getInt(TEST_POINT, 0) == 1) {
throw new RuntimeException("Error injected in setup");
}
Thread.sleep(500);
super.setup(peer);
LOG.info("Succesfully completed setup for bsp.");
}
@Override
public void cleanup(
BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> peer)
throws IOException {
if (peer.getConfiguration().getInt(TEST_POINT, 0) == 3) {
throw new RuntimeException("Error injected in cleanup");
}
try {
Thread.sleep(500);
} catch (Exception e) {
LOG.error("Interrupted BSP thread.", e);
}
super.cleanup(peer);
LOG.info("Succesfully cleaned up after bsp.");
}
@Override
public void bsp(
BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> peer)
throws IOException, SyncException, InterruptedException {
if (peer.getConfiguration().getInt(TEST_POINT, 0) == 2) {
throw new RuntimeException("Error injected in bsp function");
}
Thread.sleep(500);
LOG.info("Succesfully completed bsp.");
}
}
@Override
protected void setUp() throws Exception {
super.setUp();
conf = new HamaConfiguration();
conf.setInt(Constants.GROOM_PING_PERIOD, 200);
conf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
conf.setClass(SyncServiceFactory.SYNC_PEER_CLASS,
LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
int testNumber = incrementTestNumber();
InetSocketAddress inetAddress = new InetSocketAddress(
BSPNetUtils.getFreePort(34321) + testNumber);
groom = new MinimalGroomServer(conf);
workerServer = RPC.getServer(groom, inetAddress.getHostName(),
inetAddress.getPort(), conf);
workerServer.start();
LOG.info("Started RPC server");
conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
umbilical = (BSPPeerProtocol) RPC.getProxy(BSPPeerProtocol.class,
HamaRPCProtocolVersion.versionID, inetAddress, conf);
LOG.info("Started the proxy connections");
this.testBSPTaskService = Executors.newScheduledThreadPool(1);
}
private int getExpectedPingCounts() {
return ((int) (2 * (groom.lastPingTime - groom.firstPingTime) / (conf
.getInt(Constants.GROOM_PING_PERIOD, 5000))));
}
private void checkIfPingTestPassed() {
int expectedPingCounts = getExpectedPingCounts();
LOG.info("Counted " + groom.pingCount + " pings and expected "
+ expectedPingCounts + " pings.");
boolean testPass = groom.getPingCount() >= expectedPingCounts;
assertEquals(true, testPass);
}
/*
* Test if we get the expected counts of ping.
*/
public void testPing() {
conf.setInt(TEST_POINT, 0);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
this.testBSPTaskService);
TestBSPProcessRunner runner = new TestBSPProcessRunner(0, workerServer
.getListenerAddress().getPort());
Future<Integer> future = completionService.submit(runner);
try {
future.get(20000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e1) {
LOG.error("Interrupted Exception.", e1);
} catch (ExecutionException e1) {
LOG.error("ExecutionException Exception.", e1);
} catch (TimeoutException e) {
LOG.error("TimeoutException Exception.", e);
}
checkIfPingTestPassed();
groom.setPingCount(0);
this.testBSPTaskService.shutdownNow();
runner.destroyProcess();
}
/*
* Inject failure and different points and sense if the pings are coming or
* not.
*/
public void testPingOnTaskSetupFailure() {
LOG.info("Testing ping failure case - 1");
conf.setInt(TEST_POINT, 1);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
this.testBSPTaskService);
TestBSPProcessRunner runner = new TestBSPProcessRunner(1, workerServer
.getListenerAddress().getPort());
Future<Integer> future = completionService.submit(runner);
try {
future.get(20000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e1) {
LOG.error("Interrupted Exception.", e1);
} catch (ExecutionException e1) {
LOG.error("ExecutionException Exception.", e1);
} catch (TimeoutException e) {
LOG.error("TimeoutException Exception.", e);
}
checkIfPingTestPassed();
groom.setPingCount(0);
this.testBSPTaskService.shutdownNow();
runner.destroyProcess();
}
public void testPingOnTaskExecFailure() {
LOG.info("Testing ping failure case - 2");
conf.setInt(TEST_POINT, 2);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
this.testBSPTaskService);
TestBSPProcessRunner runner = new TestBSPProcessRunner(2, workerServer
.getListenerAddress().getPort());
Future<Integer> future = completionService.submit(runner);
try {
future.get(20000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e1) {
LOG.error("Interrupted Exception.", e1);
} catch (ExecutionException e1) {
LOG.error("ExecutionException Exception.", e1);
} catch (TimeoutException e) {
LOG.error("TimeoutException Exception.", e);
}
checkIfPingTestPassed();
groom.setPingCount(0);
this.testBSPTaskService.shutdownNow();
runner.destroyProcess();
}
public void testPingOnTaskCleanupFailure() {
LOG.info("Testing ping failure case - 3");
conf.setInt(TEST_POINT, 3);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
this.testBSPTaskService);
TestBSPProcessRunner runner = new TestBSPProcessRunner(3, workerServer
.getListenerAddress().getPort());
Future<Integer> future = completionService.submit(runner);
try {
future.get(20000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e1) {
LOG.error("Interrupted Exception.", e1);
} catch (ExecutionException e1) {
LOG.error("ExecutionException Exception.", e1);
} catch (TimeoutException e) {
LOG.error("TimeoutException Exception.", e);
}
checkIfPingTestPassed();
groom.setPingCount(0);
this.testBSPTaskService.shutdownNow();
runner.destroyProcess();
}
public void testBSPTaskSelfDestroy() {
LOG.info("Testing self kill on lost contact.");
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
this.testBSPTaskService);
TestBSPProcessRunner runner = new TestBSPProcessRunner(0, workerServer
.getListenerAddress().getPort());
Future<Integer> future = completionService.submit(runner);
try {
while (groom.pingCount == 0) {
Thread.sleep(100);
}
} catch (Exception e) {
LOG.error("Interrupted the timer for 1 sec.", e);
}
workerServer.stop();
umbilical = null;
workerServer = null;
Integer exitValue = -1;
try {
exitValue = future.get(20000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e1) {
LOG.error("Interrupted Exception.", e1);
} catch (ExecutionException e1) {
LOG.error("ExecutionException Exception.", e1);
} catch (TimeoutException e) {
LOG.error("TimeoutException Exception.", e);
}
assertEquals(69, exitValue.intValue());
runner.destroyProcess();
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
if (groom != null)
groom.setPingCount(0);
if (umbilical != null) {
umbilical.close();
Thread.sleep(2000);
}
if (workerServer != null)
workerServer.stop();
testBSPTaskService.shutdownNow();
Thread.sleep(2000);
}
}