Delete pod when creating timeout (#4424)
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 4dabc4f..777f069 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -19,47 +19,37 @@
import java.io.IOException
import java.net.SocketTimeoutException
-import java.time.{Instant, ZoneId}
import java.time.format.DateTimeFormatterBuilder
+import java.time.{Instant, ZoneId}
import akka.actor.ActorSystem
import akka.http.scaladsl.model.Uri
-import akka.http.scaladsl.model.Uri.Path
-import akka.http.scaladsl.model.Uri.Query
-import akka.stream.{Attributes, Outlet, SourceShape}
-import akka.stream.ActorMaterializer
+import akka.http.scaladsl.model.Uri.{Path, Query}
import akka.stream.scaladsl.Source
import akka.stream.stage._
+import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape}
import akka.util.ByteString
import io.fabric8.kubernetes.api.model._
-import pureconfig.loadConfigOrThrow
-import org.apache.openwhisk.common.Logging
-import org.apache.openwhisk.common.TransactionId
-import org.apache.openwhisk.core.ConfigKeys
-import org.apache.openwhisk.core.containerpool.ContainerId
-import org.apache.openwhisk.core.containerpool.ContainerAddress
-import org.apache.openwhisk.core.containerpool.docker.ProcessRunner
-import org.apache.openwhisk.core.entity.ByteSize
-import org.apache.openwhisk.core.entity.size._
-
-import scala.concurrent.duration._
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.blocking
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-import collection.JavaConverters._
-import io.fabric8.kubernetes.client.ConfigBuilder
-import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
import okhttp3.{Call, Callback, Request, Response}
import okio.BufferedSource
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.containerpool.docker.ProcessRunner
+import org.apache.openwhisk.core.containerpool.{ContainerAddress, ContainerId}
+import org.apache.openwhisk.core.entity.ByteSize
+import org.apache.openwhisk.core.entity.size._
+import pureconfig.loadConfigOrThrow
+import spray.json.DefaultJsonProtocol._
+import spray.json._
import scala.annotation.tailrec
+import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.concurrent.{blocking, ExecutionContext, Future}
import scala.util.control.NonFatal
+import scala.util.{Failure, Success, Try}
/**
* Configuration for kubernetes client command timeouts.
@@ -186,6 +176,17 @@
}
}.map(_ => ())
}
+ def rm(podName: String): Future[Unit] = {
+ Future {
+ blocking {
+ kubeRestClient
+ .inNamespace(kubeRestClient.getNamespace)
+ .pods()
+ .withName(podName)
+ .delete()
+ }
+ }.map(_ => ())
+ }
def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = {
Future {
@@ -249,6 +250,7 @@
}
trait KubernetesApi {
+
def run(name: String,
image: String,
memory: ByteSize,
@@ -256,7 +258,7 @@
labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer]
def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit]
-
+ def rm(podName: String): Future[Unit]
def rm(key: String, value: String, ensureUnpaused: Boolean)(implicit transid: TransactionId): Future[Unit]
def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit]
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 74fd292..d37139b 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -37,6 +37,8 @@
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.http.Messages
+import scala.util.Failure
+
object KubernetesContainer {
/**
@@ -67,7 +69,16 @@
for {
container <- kubernetes.run(podName, image, memory, environment, labels).recoverWith {
- case _ => Future.failed(WhiskContainerStartupError(s"Failed to run container with image '${image}'."))
+ case _ =>
+ kubernetes
+ .rm(podName)
+ .andThen {
+ case Failure(e) =>
+ log.error(this, s"Failed delete pod for '$name': ${e.getClass} - ${e.getMessage}")
+ }
+ .transformWith { _ =>
+ Future.failed(WhiskContainerStartupError(s"Failed to run container with image '${image}'."))
+ }
}
} yield container
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index 74accbe..0afaefa 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -212,6 +212,10 @@
Future.successful(())
}
+ override def rm(podName: String): Future[Unit] = {
+ rms += ContainerId(podName)
+ Future.successful(())
+ }
def rm(key: String, value: String, ensureUnpause: Boolean = false)(
implicit transid: TransactionId): Future[Unit] = {
rmByLabels += ((key, value))
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index 1b1352a..20a48d1 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -173,7 +173,6 @@
memory: ByteSize = 256.MB,
env: Map[String, String] = Map.empty,
labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer] = {
- runs += ((name, image, env, labels))
Future.failed(ProcessUnsuccessfulException(ExitStatus(1), "", ""))
}
}
@@ -182,8 +181,8 @@
KubernetesContainer.create(transid = transid, name = "name", image = "image", userProvidedImage = true)
a[WhiskContainerStartupError] should be thrownBy await(container)
- kubernetes.runs should have size 1
- kubernetes.rms should have size 0
+ kubernetes.runs should have size 0
+ kubernetes.rms should have size 1
}
/*