[LIVY-714][SERVER] Fix cannot remove the app in leakedAppTags when timeout

## What changes were proposed in this pull request?
1.`var isRemoved = false` should be in `while(iter.hasNext),` otherwise if there are two apps, the first app will be killApplication and the second app will timeout in this loop, and after removing the first app,` isRemoved = true`, and the second app cannot pass the` if(!isRemoved)` and only will be deleted in the next loop.

2.`entry.getValue - now` is negative, and never greater than `sessionLeakageCheckTimeout`.

![image](https://user-images.githubusercontent.com/51938049/69202431-99a81080-0b7c-11ea-8084-9801af5a75bd.png)

## How was this patch tested?

Existed IT and UT.

Author: runzhiwang <runzhiwang@tencent.com>

Closes #259 from runzhiwang/leakapp.
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
index 14af9fa..a245823 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
@@ -37,7 +37,8 @@
 
 object SparkYarnApp extends Logging {
 
-  def init(livyConf: LivyConf): Unit = {
+  def init(livyConf: LivyConf, client: Option[YarnClient] = None): Unit = {
+    mockYarnClient = client
     sessionLeakageCheckInterval = livyConf.getTimeAsMs(LivyConf.YARN_APP_LEAKAGE_CHECK_INTERVAL)
     sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.YARN_APP_LEAKAGE_CHECK_TIMEOUT)
     leakedAppsGCThread.setDaemon(true)
@@ -45,6 +46,8 @@
     leakedAppsGCThread.start()
   }
 
+  private var mockYarnClient: Option[YarnClient] = None
+
   // YarnClient is thread safe. Create once, share it across threads.
   lazy val yarnClient = {
     val c = YarnClient.createYarnClient()
@@ -59,9 +62,9 @@
   private def getYarnPollInterval(livyConf: LivyConf): FiniteDuration =
     livyConf.getTimeAsMs(LivyConf.YARN_POLL_INTERVAL) milliseconds
 
-  private val appType = Set("SPARK").asJava
+  private[utils] val appType = Set("SPARK").asJava
 
-  private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()
+  private[utils] val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()
 
   private var sessionLeakageCheckTimeout: Long = _
 
@@ -69,24 +72,34 @@
 
   private val leakedAppsGCThread = new Thread() {
     override def run(): Unit = {
+      val client = {
+        mockYarnClient match {
+          case Some(client) => client
+          case None => yarnClient
+        }
+      }
+
       while (true) {
         if (!leakedAppTags.isEmpty) {
           // kill the app if found it and remove it if exceeding a threshold
           val iter = leakedAppTags.entrySet().iterator()
-          var isRemoved = false
           val now = System.currentTimeMillis()
-          val apps = yarnClient.getApplications(appType).asScala
+          val apps = client.getApplications(appType).asScala
+
           while(iter.hasNext) {
+            var isRemoved = false
             val entry = iter.next()
+
             apps.find(_.getApplicationTags.contains(entry.getKey))
               .foreach({ e =>
                 info(s"Kill leaked app ${e.getApplicationId}")
-                yarnClient.killApplication(e.getApplicationId)
+                client.killApplication(e.getApplicationId)
                 iter.remove()
                 isRemoved = true
               })
+
             if (!isRemoved) {
-              if ((entry.getValue - now) > sessionLeakageCheckTimeout) {
+              if ((now - entry.getValue) > sessionLeakageCheckTimeout) {
                 iter.remove()
                 info(s"Remove leaked yarn app tag ${entry.getKey}")
               }
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
index 064bb77..d43125d 100644
--- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
+++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.livy.utils
 
+import java.util.ArrayList
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 
@@ -461,5 +462,25 @@
         }
       }
     }
+
+    it("should delete leak app when timeout") {
+      Clock.withSleepMethod(mockSleep) {
+        livyConf.set(LivyConf.YARN_APP_LEAKAGE_CHECK_INTERVAL, "100ms")
+        livyConf.set(LivyConf.YARN_APP_LEAKAGE_CHECK_TIMEOUT, "1000ms")
+
+        val client = mock[YarnClient]
+        when(client.getApplications(SparkYarnApp.appType)).
+          thenReturn(new ArrayList[ApplicationReport]())
+
+        SparkYarnApp.init(livyConf, Some(client))
+
+        SparkYarnApp.leakedAppTags.clear()
+        SparkYarnApp.leakedAppTags.put("leakApp", System.currentTimeMillis())
+
+        Eventually.eventually(Eventually.timeout(TEST_TIMEOUT), Eventually.interval(100 millis)) {
+          assert(SparkYarnApp.leakedAppTags.size() == 0)
+        }
+      }
+    }
   }
 }