blob: 6dcec8f6a309303a12cfa21f82676fd57251904a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.yarn;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.twill.api.AbstractTwillRunnable;
import org.apache.twill.api.Command;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunResources;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Test restarting of Twill Runnables.
*/
public class RestartRunnableTestRun extends BaseYarnTest {
private static final Logger LOG = LoggerFactory.getLogger(RestartRunnableTestRun.class);
private static final String HANGING_RUNNABLE = HangingRunnable.class.getSimpleName();
private static final String STOPPING_RUNNABLE = StoppingRunnable.class.getSimpleName();
private static final String HANGING_RUNNABLE_STOP_SECS = "hanging.runnable.stop.secs";
/**
* Command that can be sent to HangingRunnable to make it sleep while stopping.
*/
private static class SleepCommand implements Command {
private final int sleepTime;
public SleepCommand(int sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public String getCommand() {
return HANGING_RUNNABLE_STOP_SECS;
}
@Override
public Map<String, String> getOptions() {
return ImmutableMap.of(HANGING_RUNNABLE_STOP_SECS, Integer.toString(sleepTime));
}
}
/**
* This runnable hangs when it gets a stop message.
*/
public static final class HangingRunnable extends AbstractTwillRunnable {
private volatile Thread runThread;
// Send SleepCommand to update sleepTime to simulate hanging
private final AtomicInteger sleepTime = new AtomicInteger(1);
@Override
public void run() {
this.runThread = Thread.currentThread();
LOG.info("Starting Runnable {}", HANGING_RUNNABLE);
while (!Thread.interrupted()) {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
break;
}
}
LOG.info("Stopping Runnable {}", HANGING_RUNNABLE);
}
@Override
public void stop() {
// Do not stop the thread until sleepTime to simulate hanging of the runnable.
LOG.info("Using sleep time = {} secs", sleepTime);
try {
TimeUnit.SECONDS.sleep(sleepTime.get());
} catch (InterruptedException e) {
LOG.error("Got exception: ", e);
}
if (runThread != null) {
runThread.interrupt();
}
}
@Override
public void handleCommand(Command command) throws Exception {
super.handleCommand(command);
if (HANGING_RUNNABLE_STOP_SECS.equals(command.getCommand())) {
int time = Integer.parseInt(command.getOptions().get(HANGING_RUNNABLE_STOP_SECS));
LOG.info("Got sleep time from message = {} secs", time);
sleepTime.set(time);
}
}
}
/**
* This runnable stops immediately when it gets a stop message.
*/
public static final class StoppingRunnable extends AbstractTwillRunnable {
private volatile Thread runThread;
@Override
public void run() {
this.runThread = Thread.currentThread();
LOG.info("Starting Runnable {}", STOPPING_RUNNABLE);
while (!Thread.interrupted()) {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
break;
}
}
LOG.info("Stopping Runnable {}", STOPPING_RUNNABLE);
}
@Override
public void stop() {
// Interrupt the thread to stop the runnable
if (runThread != null) {
runThread.interrupt();
}
}
}
/**
* A test TwillApplication to test restarting runnables.
*/
public static final class RestartTestApplication implements TwillApplication {
@Override
public TwillSpecification configure() {
return TwillSpecification.Builder.with()
.setName(RestartTestApplication.class.getSimpleName())
.withRunnable()
.add(HANGING_RUNNABLE, new HangingRunnable()).noLocalFiles()
.add(STOPPING_RUNNABLE, new StoppingRunnable()).noLocalFiles()
.withOrder()
.begin(HANGING_RUNNABLE)
.nextWhenStarted(STOPPING_RUNNABLE)
.build();
}
}
/**
* A test TwillApplication with a single runnable.
*/
public static final class SingleRunnableApp implements TwillApplication {
@Override
public TwillSpecification configure() {
return TwillSpecification.Builder.with()
.setName(RestartTestApplication.class.getSimpleName())
.withRunnable()
.add(HANGING_RUNNABLE, new HangingRunnable()).noLocalFiles()
.anyOrder()
.build();
}
}
@Test
public void testRestartSingleRunnable() throws Exception {
YarnTwillRunnerService runner = getTwillRunner();
runner.start();
LOG.info("Starting application {}", SingleRunnableApp.class.getSimpleName());
TwillController controller = runner.prepare(new SingleRunnableApp())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)))
.start();
// Lets wait until all runnables have started
waitForInstance(controller, HANGING_RUNNABLE, "002", 120, TimeUnit.SECONDS);
waitForContainers(controller, 2, 60, TimeUnit.SECONDS);
// Now restart runnable
LOG.info("Restarting runnable {}", HANGING_RUNNABLE);
controller.restartAllInstances(HANGING_RUNNABLE);
waitForInstance(controller, HANGING_RUNNABLE, "003", 120, TimeUnit.SECONDS);
waitForContainers(controller, 2, 60, TimeUnit.SECONDS);
// Send command to HANGING_RUNNABLE to hang when stopped
controller.sendCommand(HANGING_RUNNABLE, new SleepCommand(1000)).get();
LOG.info("Restarting runnable {}", HANGING_RUNNABLE);
controller.restartAllInstances(HANGING_RUNNABLE);
waitForInstance(controller, HANGING_RUNNABLE, "004", 120, TimeUnit.SECONDS);
waitForContainers(controller, 2, 60, TimeUnit.SECONDS);
}
@SuppressWarnings("unchecked")
@Test
public void testRestartRunnable() throws Exception {
YarnTwillRunnerService runner = getTwillRunner();
runner.start();
LOG.info("Starting application {}", RestartTestApplication.class.getSimpleName());
TwillController controller = runner.prepare(new RestartTestApplication())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)))
.start();
// Lets wait until all runnables have started
waitForInstance(controller, HANGING_RUNNABLE, "002", 120, TimeUnit.SECONDS);
waitForInstance(controller, STOPPING_RUNNABLE, "003", 120, TimeUnit.SECONDS);
waitForContainers(controller, 3, 60, TimeUnit.SECONDS);
// Send command to first instance of HANGING_RUNNABLE to hang when stopped
controller.sendCommand(HANGING_RUNNABLE, new SleepCommand(1000)).get();
// Increase instances of both runnables
LOG.info("Increasing instances of both runnables");
allAsList(controller.changeInstances(HANGING_RUNNABLE, 3),
controller.changeInstances(STOPPING_RUNNABLE, 2)
).get(120, TimeUnit.SECONDS);
waitForInstance(controller, HANGING_RUNNABLE, "005", 120, TimeUnit.SECONDS); // +2 containers
waitForInstance(controller, STOPPING_RUNNABLE, "006", 120, TimeUnit.SECONDS); // +1 container
waitForContainers(controller, 6, 60, TimeUnit.SECONDS);
// Now restart both runnables (002 instance of HANGING_RUNNABLE will be killed)
LOG.info("Restarting all instances of runnables {} and {}", HANGING_RUNNABLE, STOPPING_RUNNABLE);
allAsList(
controller.restartAllInstances(HANGING_RUNNABLE),
controller.restartAllInstances(STOPPING_RUNNABLE)
).get(120, TimeUnit.SECONDS);
waitForInstance(controller, HANGING_RUNNABLE, "009", 120, TimeUnit.SECONDS);
waitForInstance(controller, STOPPING_RUNNABLE, "011", 120, TimeUnit.SECONDS);
waitForContainers(controller, 6, 60, TimeUnit.SECONDS);
// Restart a single runnable from both
LOG.info("Restarting a single runnable of both");
allAsList(
controller.restartInstances(HANGING_RUNNABLE, 1),
controller.restartInstances(ImmutableMap.of(STOPPING_RUNNABLE, Collections.singleton(0)))
).get(120, TimeUnit.SECONDS);
waitForInstance(controller, HANGING_RUNNABLE, "012", 120, TimeUnit.SECONDS);
waitForInstance(controller, STOPPING_RUNNABLE, "013", 120, TimeUnit.SECONDS);
waitForContainers(controller, 6, 60, TimeUnit.SECONDS);
// Send command to all instances of HANGING_RUNNABLE to wait for 10 seconds when stopped
controller.sendCommand(HANGING_RUNNABLE, new SleepCommand(10)).get();
// Reduce instances of both runnables to 1
LOG.info("Decreasing instances of both runnables");
allAsList(controller.changeInstances(HANGING_RUNNABLE, 1),
controller.changeInstances(STOPPING_RUNNABLE, 1)
).get(120, TimeUnit.SECONDS);
waitForInstance(controller, HANGING_RUNNABLE, "007", 120, TimeUnit.SECONDS);
waitForInstance(controller, STOPPING_RUNNABLE, "013", 120, TimeUnit.SECONDS); // instance 0 is 013 due to restart
waitForContainers(controller, 3, 60, TimeUnit.SECONDS);
LOG.info("Stopping application {}", RestartTestApplication.class.getSimpleName());
controller.terminate().get(120, TimeUnit.SECONDS);
// Sleep a bit for full cleanup
TimeUnit.SECONDS.sleep(2);
}
private void waitForContainers(TwillController controller, int count, long timeout, TimeUnit timeoutUnit)
throws Exception {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
int yarnContainers = 0;
int twillContainers = 0;
do {
if (controller.getResourceReport() != null) {
yarnContainers =
getApplicationResourceReport(controller.getResourceReport().getApplicationId()).getNumUsedContainers();
twillContainers = getTwillContainersUsed(controller);
if (yarnContainers == count && twillContainers == count) {
return;
}
}
TimeUnit.SECONDS.sleep(1);
} while (stopwatch.elapsedTime(timeoutUnit) < timeout);
throw new TimeoutException("Timeout reached while waiting for num containers to be " + count +
". Yarn containers = " + yarnContainers + ", Twill containers = " + twillContainers);
}
private void waitForInstance(TwillController controller, String runnable, String yarnInstanceId,
long timeout, TimeUnit timeoutUnit) throws InterruptedException, TimeoutException {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
do {
ResourceReport report = controller.getResourceReport();
if (report != null && report.getRunnableResources(runnable) != null) {
for (TwillRunResources resources : report.getRunnableResources(runnable)) {
if (resources.getContainerId().endsWith(yarnInstanceId)) {
return;
}
}
}
TimeUnit.SECONDS.sleep(1);
} while (stopwatch.elapsedTime(timeoutUnit) < timeout);
throw new TimeoutException("Timeout reached while waiting for runnable " +
runnable + " instance " + yarnInstanceId);
}
private int getTwillContainersUsed(TwillController controller) {
if (controller.getResourceReport() == null) {
return 0;
}
int count = 1; // 1 for app master container
ResourceReport resourceReport = controller.getResourceReport();
for (Collection<TwillRunResources> resources : resourceReport.getResources().values()) {
count += resources.size();
}
return count;
}
@SafeVarargs
private final <V> ListenableFuture<List<V>> allAsList(Future<? extends V>... futures) {
ImmutableList.Builder<ListenableFuture<? extends V>> listBuilder = ImmutableList.builder();
for (Future<? extends V> future : futures) {
listBuilder.add(JdkFutureAdapters.listenInPoolThread(future));
}
return Futures.allAsList(listBuilder.build());
}
}