YARN-3877. YarnClientImpl.submitApplication swallows exceptions. Contributed by Varun Saxena
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index 7760521..80e453f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -306,9 +306,10 @@
try {
Thread.sleep(submitPollIntervalMillis);
} catch (InterruptedException ie) {
- LOG.error("Interrupted while waiting for application "
- + applicationId
- + " to be successfully submitted.");
+ String msg = "Interrupted while waiting for application "
+ + applicationId + " to be successfully submitted.";
+ LOG.error(msg);
+ throw new YarnException(msg, ie);
}
} catch (ApplicationNotFoundException ex) {
// FailOver or RM restart happens before RMStateStore saves
@@ -446,8 +447,10 @@
Thread.sleep(asyncApiPollIntervalMillis);
}
} catch (InterruptedException e) {
- LOG.error("Interrupted while waiting for application " + applicationId
- + " to be killed.");
+ String msg = "Interrupted while waiting for application "
+ + applicationId + " to be killed.";
+ LOG.error(msg);
+ throw new YarnException(msg, e);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index e462be1..19966ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -27,6 +27,7 @@
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.lang.Thread.State;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@@ -201,6 +202,57 @@
client.stop();
}
+ @SuppressWarnings("deprecation")
+ @Test (timeout = 20000)
+ public void testSubmitApplicationInterrupted() throws IOException {
+ Configuration conf = new Configuration();
+ int pollIntervalMs = 1000;
+ conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
+ pollIntervalMs);
+ try (final YarnClient client = new MockYarnClient()) {
+ client.init(conf);
+ client.start();
+ // Submit the application and then interrupt it while its waiting
+ // for submission to be successful.
+ final class SubmitThread extends Thread {
+ private boolean isInterrupted = false;
+ @Override
+ public void run() {
+ ApplicationSubmissionContext context =
+ mock(ApplicationSubmissionContext.class);
+ ApplicationId applicationId = ApplicationId.newInstance(
+ System.currentTimeMillis(), 1);
+ when(context.getApplicationId()).thenReturn(applicationId);
+ ((MockYarnClient) client).setYarnApplicationState(
+ YarnApplicationState.NEW);
+ try {
+ client.submitApplication(context);
+ } catch (YarnException | IOException e) {
+ if (e instanceof YarnException && e.getCause() != null &&
+ e.getCause() instanceof InterruptedException) {
+ isInterrupted = true;
+ }
+ }
+ }
+ }
+ SubmitThread appSubmitThread = new SubmitThread();
+ appSubmitThread.start();
+ try {
+ // Wait for thread to start and begin to sleep
+ // (enter TIMED_WAITING state).
+ while (appSubmitThread.getState() != State.TIMED_WAITING) {
+ Thread.sleep(pollIntervalMs / 2);
+ }
+ // Interrupt the thread.
+ appSubmitThread.interrupt();
+ appSubmitThread.join();
+ } catch (InterruptedException e) {
+ }
+ Assert.assertTrue("Expected an InterruptedException wrapped inside a " +
+ "YarnException", appSubmitThread.isInterrupted);
+ }
+ }
+
@Test (timeout = 30000)
public void testSubmitIncorrectQueueToCapacityScheduler() throws IOException {
MiniYARNCluster cluster = new MiniYARNCluster("testMRAMTokens", 1, 1, 1);