blob: 4f5adce47c86d7becb6e8a5866bcfa14462f1a69 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.yarn;
import com.google.common.io.Files;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.twill.api.AbstractTwillRunnable;
import org.apache.twill.api.EventHandler;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.internal.yarn.YarnUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Unit test for application master resilience.
*/
public class AppRecoveryTestRun extends BaseYarnTest {
@ClassRule
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
@Test
public void testAMRestart() throws Exception {
// Only run it with Hadoop-2.1 or above
Assume.assumeTrue(YarnUtils.getHadoopVersion().compareTo(YarnUtils.HadoopVersions.HADOOP_21) >= 0);
// Don't run this test in Mac, as there would be leftover java process (HADOOP-12317)
// The test can be force to run by turning on the "force-mac-tests" maven profile
// After the test finished, run the `jps` command and delete all `TwillLauncher` processes
Assume.assumeTrue(Boolean.parseBoolean(System.getProperty("force.mac.tests")) ||
!System.getProperty("os.name").toLowerCase().contains("mac"));
File watchFile = TEMP_FOLDER.newFile();
watchFile.delete();
// Start the testing app, and wait for 4 log lines that match the pattern emitted by the event handler (AM)
// and from the runnable
final Semaphore semaphore = new Semaphore(0);
TwillRunner runner = getTwillRunner();
TwillController controller = runner.prepare(new TestApp(new TestEventHandler(watchFile)))
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
// Use a log handler to match messages from AM and the Runnable to make sure the log collection get resumed
// correctly after the AM restarted
.addLogHandler(new LogHandler() {
@Override
public void onLog(LogEntry logEntry) {
String message = logEntry.getMessage();
if (message.equals("Container for " + TestRunnable.class.getSimpleName() + " launched")) {
semaphore.release();
} else if (message.equals("Running 0")) {
semaphore.release();
}
}
})
.start();
// Wait for the first attempt running
Assert.assertTrue(semaphore.tryAcquire(2, 2, TimeUnit.MINUTES));
// Touch the watchFile so that the event handler will kill the AM
Files.touch(watchFile);
// Wait for the second attempt running
Assert.assertTrue(semaphore.tryAcquire(2, 2, TimeUnit.MINUTES));
controller.terminate().get();
}
/**
* A {@link EventHandler} for killing the first attempt of the application.
*/
public static final class TestEventHandler extends EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(TestEventHandler.class);
private File watchFile;
TestEventHandler(File watchFile) {
this.watchFile = watchFile;
}
@Override
public void containerLaunched(String runnableName, int instanceId, String containerId) {
LOG.info("Container for {} launched", runnableName);
if (containerId.contains("_01_")) {
final File watchFile = new File(context.getSpecification().getConfigs().get("watchFile"));
Thread t = new Thread() {
@Override
public void run() {
// Wait for the watch file to be available, then kill the process
while (!watchFile.exists()) {
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
Runtime.getRuntime().halt(-1);
}
};
t.setDaemon(true);
t.start();
}
}
@Override
protected Map<String, String> getConfigs() {
return Collections.singletonMap("watchFile", watchFile.getAbsolutePath());
}
}
/**
* Application for testing
*/
public static final class TestApp implements TwillApplication {
private final EventHandler eventHandler;
public TestApp(EventHandler eventHandler) {
this.eventHandler = eventHandler;
}
@Override
public TwillSpecification configure() {
return TwillSpecification.Builder.with()
.setName("TestApp")
.withRunnable()
.add(new TestRunnable()).noLocalFiles()
.anyOrder()
.withEventHandler(eventHandler).build();
}
}
/**
* Runnable for testing
*/
public static final class TestRunnable extends AbstractTwillRunnable {
private static final Logger LOG = LoggerFactory.getLogger(TestRunnable.class);
private final CountDownLatch stopLatch = new CountDownLatch(1);
@Override
public void run() {
long count = 0;
try {
while (!stopLatch.await(2, TimeUnit.SECONDS)) {
LOG.info("Running {}", count++);
}
} catch (InterruptedException e) {
LOG.info("Interrupted", e);
}
}
@Override
public void stop() {
stopLatch.countDown();
}
}
}