Delete pod when creating timeout (#4424)

diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 4dabc4f..777f069 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -19,47 +19,37 @@
 
 import java.io.IOException
 import java.net.SocketTimeoutException
-import java.time.{Instant, ZoneId}
 import java.time.format.DateTimeFormatterBuilder
+import java.time.{Instant, ZoneId}
 
 import akka.actor.ActorSystem
 import akka.http.scaladsl.model.Uri
-import akka.http.scaladsl.model.Uri.Path
-import akka.http.scaladsl.model.Uri.Query
-import akka.stream.{Attributes, Outlet, SourceShape}
-import akka.stream.ActorMaterializer
+import akka.http.scaladsl.model.Uri.{Path, Query}
 import akka.stream.scaladsl.Source
 import akka.stream.stage._
+import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape}
 import akka.util.ByteString
 import io.fabric8.kubernetes.api.model._
-import pureconfig.loadConfigOrThrow
-import org.apache.openwhisk.common.Logging
-import org.apache.openwhisk.common.TransactionId
-import org.apache.openwhisk.core.ConfigKeys
-import org.apache.openwhisk.core.containerpool.ContainerId
-import org.apache.openwhisk.core.containerpool.ContainerAddress
-import org.apache.openwhisk.core.containerpool.docker.ProcessRunner
-import org.apache.openwhisk.core.entity.ByteSize
-import org.apache.openwhisk.core.entity.size._
-
-import scala.concurrent.duration._
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.blocking
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-import collection.JavaConverters._
-import io.fabric8.kubernetes.client.ConfigBuilder
-import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
 import okhttp3.{Call, Callback, Request, Response}
 import okio.BufferedSource
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.containerpool.docker.ProcessRunner
+import org.apache.openwhisk.core.containerpool.{ContainerAddress, ContainerId}
+import org.apache.openwhisk.core.entity.ByteSize
+import org.apache.openwhisk.core.entity.size._
+import pureconfig.loadConfigOrThrow
+import spray.json.DefaultJsonProtocol._
+import spray.json._
 
 import scala.annotation.tailrec
+import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.concurrent.{blocking, ExecutionContext, Future}
 import scala.util.control.NonFatal
+import scala.util.{Failure, Success, Try}
 
 /**
  * Configuration for kubernetes client command timeouts.
@@ -186,6 +176,17 @@
       }
     }.map(_ => ())
   }
+  def rm(podName: String): Future[Unit] = {
+    Future {
+      blocking {
+        kubeRestClient
+          .inNamespace(kubeRestClient.getNamespace)
+          .pods()
+          .withName(podName)
+          .delete()
+      }
+    }.map(_ => ())
+  }
 
   def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = {
     Future {
@@ -249,6 +250,7 @@
 }
 
 trait KubernetesApi {
+
   def run(name: String,
           image: String,
           memory: ByteSize,
@@ -256,7 +258,7 @@
           labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer]
 
   def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit]
-
+  def rm(podName: String): Future[Unit]
   def rm(key: String, value: String, ensureUnpaused: Boolean)(implicit transid: TransactionId): Future[Unit]
 
   def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit]
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 74fd292..d37139b 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -37,6 +37,8 @@
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.http.Messages
 
+import scala.util.Failure
+
 object KubernetesContainer {
 
   /**
@@ -67,7 +69,16 @@
 
     for {
       container <- kubernetes.run(podName, image, memory, environment, labels).recoverWith {
-        case _ => Future.failed(WhiskContainerStartupError(s"Failed to run container with image '${image}'."))
+        case _ =>
+          kubernetes
+            .rm(podName)
+            .andThen {
+              case Failure(e) =>
+                log.error(this, s"Failed delete pod for '$name': ${e.getClass} - ${e.getMessage}")
+            }
+            .transformWith { _ =>
+              Future.failed(WhiskContainerStartupError(s"Failed to run container with image '${image}'."))
+            }
       }
     } yield container
   }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index 74accbe..0afaefa 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -212,6 +212,10 @@
       Future.successful(())
     }
 
+    override def rm(podName: String): Future[Unit] = {
+      rms += ContainerId(podName)
+      Future.successful(())
+    }
     def rm(key: String, value: String, ensureUnpause: Boolean = false)(
       implicit transid: TransactionId): Future[Unit] = {
       rmByLabels += ((key, value))
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index 1b1352a..20a48d1 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -173,7 +173,6 @@
         memory: ByteSize = 256.MB,
         env: Map[String, String] = Map.empty,
         labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer] = {
-        runs += ((name, image, env, labels))
         Future.failed(ProcessUnsuccessfulException(ExitStatus(1), "", ""))
       }
     }
@@ -182,8 +181,8 @@
       KubernetesContainer.create(transid = transid, name = "name", image = "image", userProvidedImage = true)
     a[WhiskContainerStartupError] should be thrownBy await(container)
 
-    kubernetes.runs should have size 1
-    kubernetes.rms should have size 0
+    kubernetes.runs should have size 0
+    kubernetes.rms should have size 1
   }
 
   /*