YARN-3877. YarnClientImpl.submitApplication swallows exceptions. Contributed by Varun Saxena
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index 7760521..80e453f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -306,9 +306,10 @@
         try {
           Thread.sleep(submitPollIntervalMillis);
         } catch (InterruptedException ie) {
-          LOG.error("Interrupted while waiting for application "
-              + applicationId
-              + " to be successfully submitted.");
+          String msg = "Interrupted while waiting for application "
+              + applicationId + " to be successfully submitted.";
+          LOG.error(msg);
+          throw new YarnException(msg, ie);
         }
       } catch (ApplicationNotFoundException ex) {
         // FailOver or RM restart happens before RMStateStore saves
@@ -446,8 +447,10 @@
         Thread.sleep(asyncApiPollIntervalMillis);
       }
     } catch (InterruptedException e) {
-      LOG.error("Interrupted while waiting for application " + applicationId
-          + " to be killed.");
+      String msg = "Interrupted while waiting for application "
+          + applicationId + " to be killed.";
+      LOG.error(msg);
+      throw new YarnException(msg, e);
     }
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index e462be1..19966ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -27,6 +27,7 @@
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.lang.Thread.State;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -201,6 +202,57 @@
     client.stop();
   }
 
+  @SuppressWarnings("deprecation")
+  @Test (timeout = 20000)
+  public void testSubmitApplicationInterrupted() throws IOException {
+    Configuration conf = new Configuration();
+    int pollIntervalMs = 1000;
+    conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
+        pollIntervalMs);
+    try (final YarnClient client = new MockYarnClient()) {
+      client.init(conf);
+      client.start();
+      // Submit the application and then interrupt it while its waiting
+      // for submission to be successful.
+      final class SubmitThread extends Thread {
+        private boolean isInterrupted  = false;
+        @Override
+        public void run() {
+          ApplicationSubmissionContext context =
+              mock(ApplicationSubmissionContext.class);
+          ApplicationId applicationId = ApplicationId.newInstance(
+              System.currentTimeMillis(), 1);
+          when(context.getApplicationId()).thenReturn(applicationId);
+          ((MockYarnClient) client).setYarnApplicationState(
+              YarnApplicationState.NEW);
+          try {
+            client.submitApplication(context);
+          } catch (YarnException | IOException e) {
+            if (e instanceof YarnException && e.getCause() != null &&
+                e.getCause() instanceof InterruptedException) {
+              isInterrupted = true;
+            }
+          }
+        }
+      }
+      SubmitThread appSubmitThread = new SubmitThread();
+      appSubmitThread.start();
+      try {
+        // Wait for thread to start and begin to sleep
+        // (enter TIMED_WAITING state).
+        while (appSubmitThread.getState() != State.TIMED_WAITING) {
+          Thread.sleep(pollIntervalMs / 2);
+        }
+        // Interrupt the thread.
+        appSubmitThread.interrupt();
+        appSubmitThread.join();
+      } catch (InterruptedException e) {
+      }
+      Assert.assertTrue("Expected an InterruptedException wrapped inside a " +
+          "YarnException", appSubmitThread.isInterrupted);
+    }
+  }
+
   @Test (timeout = 30000)
   public void testSubmitIncorrectQueueToCapacityScheduler() throws IOException {
     MiniYARNCluster cluster = new MiniYARNCluster("testMRAMTokens", 1, 1, 1);