(TWILL-180) Reflects YARN application completion status via TwillController
This closes #54 on Github.
Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/twill-api/src/main/java/org/apache/twill/api/ServiceController.java b/twill-api/src/main/java/org/apache/twill/api/ServiceController.java
index 1ea86b2..bb46290 100644
--- a/twill-api/src/main/java/org/apache/twill/api/ServiceController.java
+++ b/twill-api/src/main/java/org/apache/twill/api/ServiceController.java
@@ -22,6 +22,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
/**
* This interface is for controlling a remote running service.
@@ -57,7 +58,8 @@
* will be returned.
*
* @return a {@link Future} that represents the termination of the service. The future result will be
- * this {@link ServiceController}. If the service terminated due to exception, the future will carry the exception.
+ * this {@link ServiceController}. If the service terminated with a {@link TerminationStatus#FAILED} status,
+ * calling the {@link Future#get()} on the returning future will throw {@link ExecutionException}.
*/
Future<? extends ServiceController> terminate();
@@ -98,4 +100,32 @@
* @throws ExecutionException if the service terminated due to exception.
*/
void awaitTerminated(long timeout, TimeUnit timeoutUnit) throws TimeoutException, ExecutionException;
+
+ /**
+ * Gets the termination status of the application represented by this controller.
+ *
+ * @return the termination status or {@code null} if the application is still running
+ */
+ @Nullable
+ TerminationStatus getTerminationStatus();
+
+ /**
+ * Enum to represent termination status of the application when it completed.
+ */
+ enum TerminationStatus {
+ /**
+ * Application was completed successfully.
+ */
+ SUCCEEDED,
+
+ /**
+ * Application was killed explicitly.
+ */
+ KILLED,
+
+ /**
+ * Application failed.
+ */
+ FAILED
+ }
}
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
index 580a88f..3ea27fc 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
@@ -36,6 +36,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
/**
* An abstract base class for implementing {@link ServiceController} that deal with Service state transition and
@@ -47,6 +48,7 @@
private final ListenerExecutors listenerExecutors;
private final Service serviceDelegate;
private final SettableFuture<State> terminationFuture;
+ private volatile TerminationStatus terminationStatus;
protected AbstractExecutionServiceController(RunId runId) {
this.runId = runId;
@@ -87,6 +89,12 @@
});
}
+ @Nullable
+ @Override
+ public TerminationStatus getTerminationStatus() {
+ return terminationStatus;
+ }
+
@Override
public void onRunning(final Runnable runnable, Executor executor) {
addListener(new ServiceListenerAdapter() {
@@ -168,6 +176,10 @@
};
}
+ protected final void setTerminationStatus(TerminationStatus status) {
+ this.terminationStatus = status;
+ }
+
private final class ServiceDelegate extends AbstractIdleService {
@Override
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 9b6384c..0f8674b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -246,6 +246,13 @@
@Override
public void completed(int exitStatus) {
// count down the shutdownLatch to inform any waiting threads that this container is complete
+ if (exitStatus == 0) {
+ setTerminationStatus(TerminationStatus.SUCCEEDED);
+ } else if (exitStatus == 143) {
+ setTerminationStatus(TerminationStatus.KILLED);
+ } else {
+ setTerminationStatus(TerminationStatus.FAILED);
+ }
shutdownLatch.countDown();
synchronized (this) {
forceShutDown();
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
index 6ea7d8f..335d7ec 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
@@ -166,13 +166,14 @@
kill();
}
+ FinalApplicationStatus finalStatus;
// Poll application status from yarn
try (ProcessController<YarnApplicationReport> processController = this.processController) {
Stopwatch stopWatch = new Stopwatch().start();
long maxTime = TimeUnit.MILLISECONDS.convert(Constants.APPLICATION_MAX_STOP_SECONDS, TimeUnit.SECONDS);
YarnApplicationReport report = processController.getReport();
- FinalApplicationStatus finalStatus = report.getFinalApplicationStatus();
+ finalStatus = report.getFinalApplicationStatus();
ApplicationId appId = report.getApplicationId();
while (finalStatus == FinalApplicationStatus.UNDEFINED &&
stopWatch.elapsedTime(TimeUnit.MILLISECONDS) < maxTime) {
@@ -180,18 +181,28 @@
TimeUnit.SECONDS.sleep(1);
finalStatus = processController.getReport().getFinalApplicationStatus();
}
- LOG.debug("Yarn application {} {} completed with status {}", appName, appId, finalStatus);
// Application not finished after max stop time, kill the application
if (finalStatus == FinalApplicationStatus.UNDEFINED) {
kill();
+ finalStatus = FinalApplicationStatus.KILLED;
}
} catch (Exception e) {
LOG.warn("Exception while waiting for application report: {}", e.getMessage(), e);
kill();
+ finalStatus = FinalApplicationStatus.KILLED;
}
super.doShutDown();
+
+ if (finalStatus == FinalApplicationStatus.FAILED) {
+ // If we know the app status is failed, throw an exception to make this controller goes into error state.
+ // All other final status are not treated as failure as we can't be sure.
+ setTerminationStatus(TerminationStatus.FAILED);
+ throw new RuntimeException(String.format("Yarn application completed with failure %s, %s.", appName, getRunId()));
+ }
+ setTerminationStatus(finalStatus == FinalApplicationStatus.SUCCEEDED
+ ? TerminationStatus.SUCCEEDED : TerminationStatus.KILLED);
}
@Override
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
index 51031d4..6fbdc2d 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
@@ -20,6 +20,7 @@
import com.google.common.base.Throwables;
import org.apache.twill.api.AbstractTwillRunnable;
import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.ServiceController;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.logging.PrinterLogHandler;
@@ -82,7 +83,32 @@
Assert.assertTrue(runLatch.await(1, TimeUnit.MINUTES));
controller.awaitTerminated(1, TimeUnit.MINUTES);
+ Assert.assertEquals(ServiceController.TerminationStatus.SUCCEEDED, controller.getTerminationStatus());
+ }
- TimeUnit.SECONDS.sleep(2);
+ @Test
+ public void testFailureComplete() throws TimeoutException, ExecutionException, InterruptedException {
+ TwillRunner twillRunner = getTwillRunner();
+
+ // Start the app with an invalid ClassLoader. This will cause the AM fails to start.
+ TwillController controller = twillRunner.prepare(new SleepTask(),
+ ResourceSpecification.Builder.with()
+ .setVirtualCores(1)
+ .setMemory(512, ResourceSpecification.SizeUnit.MEGA)
+ .setInstances(1).build())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .setClassLoader("InvalidClassLoader")
+ .start();
+
+ final CountDownLatch terminateLatch = new CountDownLatch(1);
+ controller.onTerminated(new Runnable() {
+ @Override
+ public void run() {
+ terminateLatch.countDown();
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ Assert.assertTrue(terminateLatch.await(2, TimeUnit.MINUTES));
+ Assert.assertEquals(ServiceController.TerminationStatus.FAILED, controller.getTerminationStatus());
}
}