[hotfix, yarn] Exit JVM after YARN actor system shut down
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index c8a9480..86ff906 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -185,6 +185,12 @@
context.system.shutdown()
+ // Await actor system termination and shut down JVM
+ new YarnProcessShutDownThread(
+ log.logger,
+ context.system,
+ FiniteDuration(10, SECONDS)).start()
+
case RegisterApplicationClient =>
val client = sender()
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnProcessShutDownThread.java b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnProcessShutDownThread.java
new file mode 100644
index 0000000..8e36ac0
--- /dev/null
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnProcessShutDownThread.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.slf4j.Logger;
+import scala.concurrent.duration.Duration;
+
+import java.util.concurrent.TimeoutException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * JVM shut down thread awaiting actor system shut down for a certain amount
+ * of time before exiting the JVM.
+ *
+ * <p>On some Linux distributions, YARN is not able to stop containers, because
+ * the <code>kill</code> command has different arguments. For example when
+ * running Flink on GCE ("Debian GNU/Linux 7.9 (wheezy)"), YARN containers will
+ * not properly shut down when we don't call <code>System.exit()</code>.
+ */
+class YarnProcessShutDownThread extends Thread {
+
+ /** Log of the corresponding YARN process. */
+ private final Logger log;
+
+ /** Actor system to await termination of. */
+ private final ActorSystem actorSystem;
+
+ /** Actor system termination timeout before shutting down the JVM. */
+ private final Duration terminationTimeout;
+
+ /**
+ * Creates a shut down thread.
+ *
+ * @param log Log of the corresponding YARN process.
+ * @param actorSystem Actor system to await termination of.
+ * @param terminationTimeout Actor system termination timeout before
+ * shutting down the JVM.
+ */
+ public YarnProcessShutDownThread(
+ Logger log,
+ ActorSystem actorSystem,
+ Duration terminationTimeout) {
+
+ this.log = checkNotNull(log, "Logger");
+ this.actorSystem = checkNotNull(actorSystem, "Actor system");
+ this.terminationTimeout = checkNotNull(terminationTimeout, "Termination timeout");
+ }
+
+ @Override
+ public void run() {
+ try {
+ actorSystem.awaitTermination(terminationTimeout);
+ } catch (Exception e) {
+ if (e instanceof TimeoutException) {
+ log.error("Actor system shut down timed out.", e);
+ } else {
+ log.error("Failure during actor system shut down.", e);
+ }
+ } finally {
+ log.info("Shutdown completed. Stopping JVM.");
+ System.exit(0);
+ }
+ }
+}
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index ad5eace..828441b 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -23,9 +23,11 @@
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{NetworkEnvironmentConfiguration, TaskManagerConfiguration, TaskManager}
+import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, TaskManager}
import org.apache.flink.yarn.YarnMessages.StopYarnSession
+import scala.concurrent.duration._
+
/** An extension of the TaskManager that listens for additional YARN related
* messages.
*/
@@ -55,5 +57,11 @@
log.info(s"Stopping YARN TaskManager with final application status $status " +
s"and diagnostics: $diagnostics")
context.system.shutdown()
+
+ // Await actor system termination and shut down JVM
+ new YarnProcessShutDownThread(
+ log.logger,
+ context.system,
+ FiniteDuration(10, SECONDS)).start()
}
}