[hotfix, yarn] Exit JVM after YARN actor system shut down
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index c8a9480..86ff906 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -185,6 +185,12 @@
 
       context.system.shutdown()
 
+      // Await actor system termination and shut down JVM
+      new YarnProcessShutDownThread(
+        log.logger,
+        context.system,
+        FiniteDuration(10, SECONDS)).start()
+
     case RegisterApplicationClient =>
       val client = sender()
 
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnProcessShutDownThread.java b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnProcessShutDownThread.java
new file mode 100644
index 0000000..8e36ac0
--- /dev/null
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnProcessShutDownThread.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.slf4j.Logger;
+import scala.concurrent.duration.Duration;
+
+import java.util.concurrent.TimeoutException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * JVM shut down thread awaiting actor system shut down for a certain amount
+ * of time before exiting the JVM.
+ *
+ * <p>On some Linux distributions, YARN is not able to stop containers, because
+ * the <code>kill</code> command has different arguments. For example when
+ * running Flink on GCE ("Debian GNU/Linux 7.9 (wheezy)"), YARN containers will
+ * not properly shut down when we don't call <code>System.exit()</code>.
+ */
+class YarnProcessShutDownThread extends Thread {
+
+	/** Log of the corresponding YARN process. */
+	private final Logger log;
+
+	/** Actor system to await termination of. */
+	private final ActorSystem actorSystem;
+
+	/** Actor system termination timeout before shutting down the JVM. */
+	private final Duration terminationTimeout;
+
+	/**
+	 * Creates a shut down thread.
+	 *
+	 * @param log                Log of the corresponding YARN process.
+	 * @param actorSystem        Actor system to await termination of.
+	 * @param terminationTimeout Actor system termination timeout before
+	 *                           shutting down the JVM.
+	 */
+	public YarnProcessShutDownThread(
+			Logger log,
+			ActorSystem actorSystem,
+			Duration terminationTimeout) {
+
+		this.log = checkNotNull(log, "Logger");
+		this.actorSystem = checkNotNull(actorSystem, "Actor system");
+		this.terminationTimeout = checkNotNull(terminationTimeout, "Termination timeout");
+	}
+
+	@Override
+	public void run() {
+		try {
+			actorSystem.awaitTermination(terminationTimeout);
+		} catch (Exception e) {
+			if (e instanceof TimeoutException) {
+				log.error("Actor system shut down timed out.", e);
+			} else {
+				log.error("Failure during actor system shut down.", e);
+			}
+		} finally {
+			log.info("Shutdown completed. Stopping JVM.");
+			System.exit(0);
+		}
+	}
+}
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index ad5eace..828441b 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -23,9 +23,11 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{NetworkEnvironmentConfiguration, TaskManagerConfiguration, TaskManager}
+import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, TaskManager}
 import org.apache.flink.yarn.YarnMessages.StopYarnSession
 
+import scala.concurrent.duration._
+
 /** An extension of the TaskManager that listens for additional YARN related
   * messages.
   */
@@ -55,5 +57,11 @@
       log.info(s"Stopping YARN TaskManager with final application status $status " +
         s"and diagnostics: $diagnostics")
       context.system.shutdown()
+
+      // Await actor system termination and shut down JVM
+      new YarnProcessShutDownThread(
+        log.logger,
+        context.system,
+        FiniteDuration(10, SECONDS)).start()
   }
 }