Add healthcheck for Invoker -> Action Container (#4698)
Validate prewarm containers are reachable before use; for paused containers, connection failures on first activation after resume causes reschedule to ContainerPool
Co-authored-by: Cosmin Stanciu <selfxp@users.noreply.github.com>
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 404e0c1..37591be 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -474,8 +474,23 @@
LogMarkerToken(invoker, "mesos", start, Some(cmd), Map("cmd" -> cmd))(MeasurementUnit.time.milliseconds)
def INVOKER_MESOS_CMD_TIMEOUT(cmd: String) =
LogMarkerToken(invoker, "mesos", timeout, Some(cmd), Map("cmd" -> cmd))(MeasurementUnit.none)
- def INVOKER_CONTAINER_START(containerState: String) =
- LogMarkerToken(invoker, "containerStart", counter, Some(containerState), Map("containerState" -> containerState))(
+ def INVOKER_CONTAINER_START(containerState: String, invocationNamespace: String, namespace: String, action: String) =
+ LogMarkerToken(
+ invoker,
+ "containerStart",
+ counter,
+ Some(containerState),
+ Map(
+ "containerState" -> containerState,
+ "initiator" -> invocationNamespace,
+ "namespace" -> namespace,
+ "action" -> action))(MeasurementUnit.none)
+ val INVOKER_CONTAINER_HEALTH = LogMarkerToken(invoker, "containerHealth", start)(MeasurementUnit.time.milliseconds)
+ val INVOKER_CONTAINER_HEALTH_FAILED_WARM =
+ LogMarkerToken(invoker, "containerHealthFailed", counter, Some("warm"), Map("containerState" -> "warm"))(
+ MeasurementUnit.none)
+ val INVOKER_CONTAINER_HEALTH_FAILED_PREWARM =
+ LogMarkerToken(invoker, "containerHealthFailed", counter, Some("prewarm"), Map("containerState" -> "prewarm"))(
MeasurementUnit.none)
val CONTAINER_CLIENT_RETRIES =
LogMarkerToken(containerClient, "retries", counter)(MeasurementUnit.none)
@@ -494,6 +509,10 @@
LogMarkerToken(containerPool, "prewarmCount", counter)(MeasurementUnit.none)
val CONTAINER_POOL_PREWARM_SIZE =
LogMarkerToken(containerPool, "prewarmSize", counter)(MeasurementUnit.information.megabytes)
+ val CONTAINER_POOL_IDLES_COUNT =
+ LogMarkerToken(containerPool, "idlesCount", counter)(MeasurementUnit.none)
+ val CONTAINER_POOL_IDLES_SIZE =
+ LogMarkerToken(containerPool, "idlesSize", counter)(MeasurementUnit.information.megabytes)
val INVOKER_TOTALMEM_BLACKBOX = LogMarkerToken(loadbalancer, "totalCapacityBlackBox", counter)(MeasurementUnit.none)
val INVOKER_TOTALMEM_MANAGED = LogMarkerToken(loadbalancer, "totalCapacityManaged", counter)(MeasurementUnit.none)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
index 85951a5..6fcc566 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
@@ -225,6 +225,7 @@
val invokerHealth = TransactionId(systemPrefix + "invokerHealth") // Invoker supervision
val controller = TransactionId(systemPrefix + "controller") // Controller startup
val dbBatcher = TransactionId(systemPrefix + "dbBatcher") // Database batcher
+ val actionHealthPing = TransactionId(systemPrefix + "actionHealth")
def apply(tid: String, extraLogging: Boolean = false): TransactionId = {
val now = Instant.now(Clock.systemUTC()).inMills
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index b6fa765..62a4c98 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -247,6 +247,7 @@
val containerProxy = "whisk.container-proxy"
val containerProxyTimeouts = s"$containerProxy.timeouts"
+ val containerProxyHealth = s"$containerProxy.action-health-check"
val s3 = "whisk.s3"
val query = "whisk.query-limit"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
index 8eaab12..3dea6db 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
@@ -87,9 +87,10 @@
* @param endpoint the path the api call relative to hostname
* @param body the JSON value to post (this is usually a JSON objecT)
* @param retry whether or not to retry on connection failure
+ * @param reschedule whether or not to throw ContainerHealthError (triggers reschedule) on connection failure
* @return Left(Error Message) or Right(Status Code, Response as UTF-8 String)
*/
- def post(endpoint: String, body: JsValue, retry: Boolean)(
+ def post(endpoint: String, body: JsValue, retry: Boolean, reschedule: Boolean = false)(
implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = {
//create the request
@@ -98,7 +99,7 @@
.withHeaders(Accept(MediaTypes.`application/json`))
}
- retryingRequest(req, timeout, retry)
+ retryingRequest(req, timeout, retry, reschedule, endpoint)
.flatMap {
case (response, retries) => {
if (retries > 0) {
@@ -126,26 +127,35 @@
}
}
}
- .recover {
- case t: TimeoutException => Left(Timeout(t))
- case NonFatal(t) => Left(ConnectionError(t))
+ .recoverWith {
+ case t: TimeoutException =>
+ Future.successful(Left(Timeout(t)))
+ case t: ContainerHealthError =>
+ //propagate as a failed future; clients can retry at a different container
+ Future.failed(t)
+ case NonFatal(t) =>
+ Future.successful(Left(ConnectionError(t)))
}
}
//returns a Future HttpResponse -> Int (where Int is the retryCount)
private def retryingRequest(req: Future[HttpRequest],
timeout: FiniteDuration,
retry: Boolean,
- retryCount: Int = 0): Future[(HttpResponse, Int)] = {
+ reschedule: Boolean,
+ endpoint: String,
+ retryCount: Int = 0)(implicit tid: TransactionId): Future[(HttpResponse, Int)] = {
val start = Instant.now
request(req)
.map((_, retryCount))
.recoverWith {
+ case _: StreamTcpException if reschedule =>
+ Future.failed(ContainerHealthError(tid, endpoint))
case t: StreamTcpException if retry =>
if (timeout > Duration.Zero) {
akka.pattern.after(retryInterval, as.scheduler)({
val newTimeout = timeout - (Instant.now.toEpochMilli - start.toEpochMilli).milliseconds
- retryingRequest(req, newTimeout, retry, retryCount + 1)
+ retryingRequest(req, newTimeout, retry, reschedule, endpoint, retryCount + 1)
})
} else {
logging.warn(
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
index b4452a8..ecfda42 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
@@ -90,7 +90,7 @@
* @param retry whether or not to retry on connection failure
* @return Left(Error Message) or Right(Status Code, Response as UTF-8 String)
*/
- def post(endpoint: String, body: JsValue, retry: Boolean)(
+ def post(endpoint: String, body: JsValue, retry: Boolean, reschedule: Boolean = false)(
implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = {
val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8)
entity.setContentType("application/json")
@@ -101,14 +101,18 @@
Future {
blocking {
- execute(request, timeout, maxConcurrent, retry)
+ execute(request, timeout, maxConcurrent, retry, reschedule)
}
}
}
// Annotation will make the compiler complain if no tail recursion is possible
- @tailrec private def execute(request: HttpRequestBase, timeout: FiniteDuration, maxConcurrent: Int, retry: Boolean)(
- implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = {
+ @tailrec private def execute(
+ request: HttpRequestBase,
+ timeout: FiniteDuration,
+ maxConcurrent: Int,
+ retry: Boolean,
+ reschedule: Boolean = false)(implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = {
val start = Instant.now
Try(connection.execute(request)).map { response =>
@@ -158,7 +162,10 @@
case t: NoHttpResponseException if ApacheBlockingContainerClient.clientConfig.retryNoHttpResponseException =>
Failure(RetryableConnectionError(t))
} match {
- case Success(response) => response
+ case Success(response) => response
+ case Failure(_: RetryableConnectionError) if reschedule =>
+ //propagate as a failed future; clients can retry at a different container
+ throw ContainerHealthError(tid, request.getURI.toString)
case Failure(t: RetryableConnectionError) if retry =>
if (timeout > Duration.Zero) {
Thread.sleep(50) // Sleep for 50 milliseconds
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
index 59c0804..20627fb 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
@@ -48,6 +48,7 @@
}
case class ContainerAddress(host: String, port: Int = 8080) {
require(host.nonEmpty, "ContainerIp must not be empty")
+ def asString() = s"${host}:${port}"
}
object Container {
@@ -72,7 +73,7 @@
implicit protected val as: ActorSystem
protected val id: ContainerId
- protected val addr: ContainerAddress
+ protected[core] val addr: ContainerAddress
protected implicit val logging: Logging
protected implicit val ec: ExecutionContext
@@ -149,8 +150,11 @@
}
/** Runs code in the container. Thread-safe - caller may invoke concurrently for concurrent activation processing. */
- def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration, maxConcurrent: Int)(
- implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+ def run(parameters: JsObject,
+ environment: JsObject,
+ timeout: FiniteDuration,
+ maxConcurrent: Int,
+ reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
val actionName = environment.fields.get("action_name").map(_.convertTo[String]).getOrElse("")
val start =
transid.started(
@@ -161,7 +165,7 @@
val parameterWrapper = JsObject("value" -> parameters)
val body = JsObject(parameterWrapper.fields ++ environment.fields)
- callContainer("/run", body, timeout, maxConcurrent, retry = false)
+ callContainer("/run", body, timeout, maxConcurrent, retry = false, reschedule)
.andThen { // never fails
case Success(r: RunResult) =>
transid.finished(
@@ -193,12 +197,14 @@
* @param body body to send
* @param timeout timeout of the request
* @param retry whether or not to retry the request
+ * @param reschedule throw a reschedule error in case of connection failure
*/
protected def callContainer(path: String,
body: JsObject,
timeout: FiniteDuration,
maxConcurrent: Int,
- retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
+ retry: Boolean = false,
+ reschedule: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
val started = Instant.now()
val http = httpConnection.getOrElse {
val conn = openConnections(timeout, maxConcurrent)
@@ -206,7 +212,7 @@
conn
}
http
- .post(path, body, retry)
+ .post(path, body, retry, reschedule)
.map { response =>
val finished = Instant.now()
RunResult(Interval(started, finished), response)
@@ -247,6 +253,9 @@
/** Indicates an error while initializing a container */
case class InitializationError(interval: Interval, response: ActivationResponse) extends Exception(response.toString)
+/** Indicates a connection error after resuming a container */
+case class ContainerHealthError(tid: TransactionId, msg: String) extends Exception(msg)
+
case class Interval(start: Instant, end: Instant) {
def duration = Duration.create(end.toEpochMilli() - start.toEpochMilli(), MILLISECONDS)
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerClient.scala
index 0720a03..597376c 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerClient.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerClient.scala
@@ -24,7 +24,7 @@
import org.apache.openwhisk.core.entity.ActivationResponse._
trait ContainerClient {
- def post(endpoint: String, body: JsValue, retry: Boolean)(
+ def post(endpoint: String, body: JsValue, retry: Boolean, reschedule: Boolean)(
implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]]
def close(): Future[Unit]
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
index 968f942..a6abec4 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
@@ -188,7 +188,7 @@
}
class MesosTask(override protected val id: ContainerId,
- override protected val addr: ContainerAddress,
+ override protected[core] val addr: ContainerAddress,
override protected implicit val ec: ExecutionContext,
override protected implicit val logging: Logging,
override protected val as: ActorSystem,
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNTask.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNTask.scala
index d7fcf1f..714a04e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNTask.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNTask.scala
@@ -43,7 +43,7 @@
* (external log collection and retrieval must be enabled via LogStore SPI to expose logs to wsk cli)
*/
class YARNTask(override protected val id: ContainerId,
- override protected val addr: ContainerAddress,
+ override protected[core] val addr: ContainerAddress,
override protected val ec: ExecutionContext,
override protected val logging: Logging,
override protected val as: ActorSystem,
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index a998ecb..5760a4b 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -122,6 +122,11 @@
idle-container = 10 minutes
pause-grace = 50 milliseconds
}
+ action-health-check {
+ enabled = false # if true, prewarm containers will be pinged periodically and warm containers will be pinged once after resumed
+ check-period = 3 seconds # how often should prewarm containers be pinged (tcp connection attempt)
+ max-fails = 3 # prewarm containers that fail this number of times will be destroyed and replaced
+ }
}
# tracing configuration
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index ea33006..1e267fe 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -69,6 +69,7 @@
var freePool = immutable.Map.empty[ActorRef, ContainerData]
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
var prewarmedPool = immutable.Map.empty[ActorRef, ContainerData]
+ var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
// If all memory slots are occupied and if there is currently no container to be removed, than the actions will be
// buffered here to keep order of computation.
// Otherwise actions with small memory-limits could block actions with large memory limits.
@@ -79,13 +80,7 @@
//periodically emit metrics (don't need to do this for each message!)
context.system.scheduler.schedule(30.seconds, 10.seconds, self, EmitMetrics)
- prewarmConfig.foreach { config =>
- logging.info(this, s"pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")(
- TransactionId.invokerWarmup)
- (1 to config.count).foreach { _ =>
- prewarmContainer(config.exec, config.memoryLimit)
- }
- }
+ backfillPrewarms(true)
def logContainerStart(r: Run, containerState: String, activeActivations: Int, container: Option[Container]): Unit = {
val namespaceName = r.msg.user.namespace.name
@@ -95,7 +90,11 @@
r.msg.transid.mark(
this,
- LoggingMarkers.INVOKER_CONTAINER_START(containerState),
+ LoggingMarkers.INVOKER_CONTAINER_START(
+ containerState,
+ r.msg.user.namespace.toString,
+ r.msg.action.namespace.toString,
+ r.msg.action.name.toString),
s"containerStart containerState: $containerState container: $container activations: $activeActivations of max $maxConcurrent action: $actionName namespace: $namespaceName activationId: $activationId",
akka.event.Logging.InfoLevel)
}
@@ -242,6 +241,7 @@
processBufferOrFeed()
// Container is prewarmed and ready to take work
case NeedWork(data: PreWarmedData) =>
+ prewarmStartingPool = prewarmStartingPool - sender()
prewarmedPool = prewarmedPool + (sender() -> data)
// Container got removed
@@ -256,6 +256,21 @@
busyPool = busyPool - sender()
}
processBufferOrFeed()
+
+ //in case this was a prewarm
+ prewarmedPool.get(sender()).foreach { _ =>
+ logging.info(this, "failed prewarm removed")
+ prewarmedPool = prewarmedPool - sender()
+ }
+ //in case this was a starting prewarm
+ prewarmStartingPool.get(sender()).foreach { _ =>
+ logging.info(this, "failed starting prewarm removed")
+ prewarmStartingPool = prewarmStartingPool - sender()
+ }
+
+ //backfill prewarms on every ContainerRemoved, just in case
+ backfillPrewarms(false) //in case a prewarm is removed due to health failure or crash
+
// This message is received for one of these reasons:
// 1. Container errored while resuming a warm container, could not process the job, and sent the job back
// 2. The container aged, is destroying itself, and was assigned a job which it had to send back
@@ -287,6 +302,29 @@
}
}
+ /** Install prewarm containers up to the configured requirements for each kind/memory combination. */
+ def backfillPrewarms(init: Boolean) = {
+ prewarmConfig.foreach { config =>
+ val kind = config.exec.kind
+ val memory = config.memoryLimit
+ val currentCount = prewarmedPool.count {
+ case (_, PreWarmedData(_, `kind`, `memory`, _)) => true //done starting
+ case _ => false //started but not finished starting
+ }
+ val startingCount = prewarmStartingPool.count(p => p._2._1 == kind && p._2._2 == memory)
+ val containerCount = currentCount + startingCount
+ if (containerCount < config.count) {
+ logging.info(
+ this,
+ s"found ${currentCount} started and ${startingCount} starting; ${if (init) "initing" else "backfilling"} ${config.count - containerCount} pre-warms to desired count: ${config.count} for kind:${config.exec.kind} mem:${config.memoryLimit.toString}")(
+ TransactionId.invokerWarmup)
+ (containerCount until config.count).foreach { _ =>
+ prewarmContainer(config.exec, config.memoryLimit)
+ }
+ }
+ }
+ }
+
/** Creates a new container and updates state accordingly. */
def createContainer(memoryLimit: ByteSize): (ActorRef, ContainerData) = {
val ref = childFactory(context)
@@ -296,8 +334,11 @@
}
/** Creates a new prewarmed container */
- def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize): Unit =
- childFactory(context) ! Start(exec, memoryLimit)
+ def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize): Unit = {
+ val newContainer = childFactory(context)
+ prewarmStartingPool = prewarmStartingPool + (newContainer -> (exec.kind, memoryLimit))
+ newContainer ! Start(exec, memoryLimit)
+ }
/**
* Takes a prewarm container out of the prewarmed pool
@@ -362,6 +403,10 @@
MetricEmitter.emitGaugeMetric(
LoggingMarkers.CONTAINER_POOL_PREWARM_SIZE,
prewarmedPool.map(_._2.memoryLimit.toMB).sum)
+ val unused = freePool.filter(_._2.activeActivationCount == 0)
+ val unusedMB = unused.map(_._2.memoryLimit.toMB).sum
+ MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_IDLES_COUNT, unused.size)
+ MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_IDLES_SIZE, unusedMB)
}
}
@@ -397,8 +442,8 @@
idles: Map[A, ContainerData]): Option[(A, ContainerData)] = {
idles
.find {
- case (_, c @ WarmedData(_, `invocationNamespace`, `action`, _, _)) if c.hasCapacity() => true
- case _ => false
+ case (_, c @ WarmedData(_, `invocationNamespace`, `action`, _, _, _)) if c.hasCapacity() => true
+ case _ => false
}
.orElse {
idles.find {
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 50024fd..a35d999 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -17,15 +17,28 @@
package org.apache.openwhisk.core.containerpool
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.Cancellable
import java.time.Instant
-
import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{FSM, Props, Stash}
import akka.event.Logging.InfoLevel
+import akka.io.IO
+import akka.io.Tcp
+import akka.io.Tcp.Close
+import akka.io.Tcp.CommandFailed
+import akka.io.Tcp.Connect
+import akka.io.Tcp.Connected
import akka.pattern.pipe
import pureconfig._
import pureconfig.generic.auto._
+import akka.stream.ActorMaterializer
+import java.net.InetSocketAddress
+import java.net.SocketException
+import org.apache.openwhisk.common.MetricEmitter
+import org.apache.openwhisk.common.TransactionId.systemPrefix
import scala.collection.immutable
import spray.json.DefaultJsonProtocol._
import spray.json._
@@ -44,7 +57,6 @@
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.invoker.InvokerReactive.{ActiveAck, LogsCollector}
import org.apache.openwhisk.http.Messages
-
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}
@@ -165,17 +177,22 @@
invocationNamespace: EntityName,
action: ExecutableWhiskAction,
override val lastUsed: Instant,
- override val activeActivationCount: Int = 0)
+ override val activeActivationCount: Int = 0,
+ resumeRun: Option[Run] = None)
extends ContainerStarted(container, lastUsed, action.limits.memory.megabytes.MB, activeActivationCount)
with ContainerInUse {
override val initingState = "warmed"
override def nextRun(r: Run) = copy(lastUsed = Instant.now, activeActivationCount = activeActivationCount + 1)
+ //track the resuming run for easily referring to the action being resumed (it may fail and be resent)
+ def withoutResumeRun() = this.copy(resumeRun = None)
+ def withResumeRun(job: Run) = this.copy(resumeRun = Some(job))
}
// Events received by the actor
case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
case class Run(action: ExecutableWhiskAction, msg: ActivationMessage, retryLogDeadline: Option[Deadline] = None)
case object Remove
+case class HealthPingEnabled(enabled: Boolean)
// Events sent by the actor
case class NeedWork(data: ContainerData)
@@ -234,12 +251,16 @@
collectLogs: LogsCollector,
instance: InvokerInstanceId,
poolConfig: ContainerPoolConfig,
+ healtCheckConfig: ContainerProxyHealthCheckConfig,
unusedTimeout: FiniteDuration,
- pauseGrace: FiniteDuration)
+ pauseGrace: FiniteDuration,
+ testTcp: Option[ActorRef])
extends FSM[ContainerState, ContainerData]
with Stash {
implicit val ec = context.system.dispatcher
implicit val logging = new AkkaLogging(context.system.log)
+ implicit val ac = context.system
+ implicit val materializer = ActorMaterializer()
var rescheduleJob = false // true iff actor receives a job but cannot process it because actor will destroy itself
var runBuffer = immutable.Queue.empty[Run] //does not retain order, but does manage jobs that would have pushed past action concurrency limit
//track buffer processing state to avoid extra transitions near end of buffer - this provides a pseudo-state between Running and Ready
@@ -247,6 +268,8 @@
//keep a separate count to avoid confusion with ContainerState.activeActivationCount that is tracked/modified only in ContainerPool
var activeCount = 0;
+ var healthPingActor: Option[ActorRef] = None //setup after prewarm starts
+ val tcp: ActorRef = testTcp.getOrElse(IO(Tcp)) //allows to testing interaction with Tcp extension
startWith(Uninitialized, NoData())
when(Uninitialized) {
@@ -346,7 +369,12 @@
.pipeTo(self)
goto(Running) using PreWarmedData(data.container, data.kind, data.memoryLimit, 1)
- case Event(Remove, data: PreWarmedData) => destroyContainer(data.container)
+ case Event(Remove, data: PreWarmedData) => destroyContainer(data)
+
+ // prewarm container failed
+ case Event(_: FailureMessage, data: PreWarmedData) =>
+ MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_PREWARM)
+ destroyContainer(data)
}
when(Running) {
@@ -382,12 +410,12 @@
// Run was successful
case Event(RunCompleted, data: WarmedData) =>
activeCount -= 1
-
+ val newData = data.withoutResumeRun()
//if there are items in runbuffer, process them if there is capacity, and stay; otherwise if we have any pending activations, also stay
if (requestWork(data) || activeCount > 0) {
- stay using data
+ stay using newData
} else {
- goto(Ready) using data
+ goto(Ready) using newData
}
case Event(job: Run, data: WarmedData)
if activeCount >= data.action.limits.concurrency.maxConcurrent && !rescheduleJob => //if we are over concurrency limit, and not a failure on resume
@@ -405,15 +433,31 @@
.pipeTo(self)
stay() using data
+ //ContainerHealthError should cause rescheduling of the job
+ case Event(FailureMessage(e: ContainerHealthError), data: WarmedData) =>
+ implicit val tid = e.tid
+ MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_WARM)
+ //resend to self will send to parent once we get to Removing state
+ val newData = data.resumeRun
+ .map { run =>
+ logging.warn(this, "Ready warm container unhealthy, will retry activation.")
+ self ! run
+ data.withoutResumeRun()
+ }
+ .getOrElse(data)
+ rescheduleJob = true
+ rejectBuffered()
+ destroyContainer(newData)
+
// Failed after /init (the first run failed)
case Event(_: FailureMessage, data: PreWarmedData) =>
activeCount -= 1
- destroyContainer(data.container)
+ destroyContainer(data)
// Failed for a subsequent /run
case Event(_: FailureMessage, data: WarmedData) =>
activeCount -= 1
- destroyContainer(data.container)
+ destroyContainer(data)
// Failed at getting a container for a cold-start run
case Event(_: FailureMessage, _) =>
@@ -429,24 +473,28 @@
case Event(job: Run, data: WarmedData) =>
implicit val transid = job.msg.transid
activeCount += 1
-
- initializeAndRun(data.container, job)
+ val newData = data.withResumeRun(job)
+ initializeAndRun(data.container, job, true)
.map(_ => RunCompleted)
.pipeTo(self)
- goto(Running) using data
+ goto(Running) using newData
// pause grace timed out
case Event(StateTimeout, data: WarmedData) =>
data.container.suspend()(TransactionId.invokerNanny).map(_ => ContainerPaused).pipeTo(self)
goto(Pausing)
- case Event(Remove, data: WarmedData) => destroyContainer(data.container)
+ case Event(Remove, data: WarmedData) => destroyContainer(data)
+
+ // warm container failed
+ case Event(_: FailureMessage, data: WarmedData) =>
+ destroyContainer(data)
}
when(Pausing) {
case Event(ContainerPaused, data: WarmedData) => goto(Paused)
- case Event(_: FailureMessage, data: WarmedData) => destroyContainer(data.container)
+ case Event(_: FailureMessage, data: WarmedData) => destroyContainer(data)
case _ => delay
}
@@ -454,7 +502,7 @@
case Event(job: Run, data: WarmedData) =>
implicit val transid = job.msg.transid
activeCount += 1
-
+ val newData = data.withResumeRun(job)
data.container
.resume()
.andThen {
@@ -465,16 +513,15 @@
rescheduleJob = true
self ! job
}
- .flatMap(_ => initializeAndRun(data.container, job))
+ .flatMap(_ => initializeAndRun(data.container, job, true))
.map(_ => RunCompleted)
.pipeTo(self)
-
- goto(Running) using data
+ goto(Running) using newData
// container is reclaimed by the pool or it has become too old
case Event(StateTimeout | Remove, data: WarmedData) =>
rescheduleJob = true // to supress sending message to the pool and not double count
- destroyContainer(data.container)
+ destroyContainer(data)
}
when(Removing) {
@@ -488,10 +535,25 @@
// Unstash all messages stashed while in intermediate state
onTransition {
- case _ -> Started => unstashAll()
- case _ -> Ready => unstashAll()
- case _ -> Paused => unstashAll()
- case _ -> Removing => unstashAll()
+ case _ -> Started =>
+ if (healtCheckConfig.enabled) {
+ logging.debug(this, "enabling health ping on Started")
+ nextStateData.getContainer.foreach { c =>
+ enableHealthPing(c)
+ }
+ }
+ unstashAll()
+ case _ -> Running =>
+ if (healtCheckConfig.enabled && healthPingActor.isDefined) {
+ logging.debug(this, "disabling health ping on Running")
+ disableHealthPing()
+ }
+ case _ -> Ready =>
+ unstashAll()
+ case _ -> Paused =>
+ unstashAll()
+ case _ -> Removing =>
+ unstashAll()
}
initialize()
@@ -545,9 +607,10 @@
* Destroys the container after unpausing it if needed. Can be used
* as a state progression as it goes to Removing.
*
- * @param container the container to destroy
+ * @param newData the ContainerStarted which container will be destroyed
*/
- def destroyContainer(container: Container) = {
+ def destroyContainer(newData: ContainerStarted) = {
+ val container = newData.container
if (!rescheduleJob) {
context.parent ! ContainerRemoved
} else {
@@ -565,8 +628,8 @@
.flatMap(_ => container.destroy()(TransactionId.invokerNanny))
.map(_ => ContainerRemoved)
.pipeTo(self)
-
- goto(Removing)
+ println("removing")
+ goto(Removing) using newData
}
/**
@@ -581,6 +644,21 @@
}
}
+ private def enableHealthPing(c: Container) = {
+ val hpa = healthPingActor.getOrElse {
+ logging.info(this, s"creating health ping actor for ${c.addr.asString()}")
+ val hp = context.actorOf(
+ TCPPingClient
+ .props(tcp, c.toString(), healtCheckConfig, new InetSocketAddress(c.addr.host, c.addr.port)))
+ healthPingActor = Some(hp)
+ hp
+ }
+ hpa ! HealthPingEnabled(true)
+ }
+ private def disableHealthPing() = {
+ healthPingActor.foreach(_ ! HealthPingEnabled(false))
+ }
+
/**
* Runs the job, initialize first if necessary.
* Completes the job by:
@@ -594,7 +672,8 @@
* @return a future completing after logs have been collected and
* added to the WhiskActivation
*/
- def initializeAndRun(container: Container, job: Run)(implicit tid: TransactionId): Future[WhiskActivation] = {
+ def initializeAndRun(container: Container, job: Run, reschedule: Boolean = false)(
+ implicit tid: TransactionId): Future[WhiskActivation] = {
val actionTimeout = job.action.limits.timeout.duration
val (env, parameters) = ContainerProxy.partitionArguments(job.msg.content, job.msg.initArgs)
@@ -643,8 +722,12 @@
"deadline" -> (Instant.now.toEpochMilli + actionTimeout.toMillis).toString.toJson)
container
- .run(parameters, env.toJson.asJsObject, actionTimeout, job.action.limits.concurrency.maxConcurrent)(
- job.msg.transid)
+ .run(
+ parameters,
+ env.toJson.asJsObject,
+ actionTimeout,
+ job.action.limits.concurrency.maxConcurrent,
+ reschedule)(job.msg.transid)
.map {
case (runInterval, response) =>
val initRunInterval = initInterval
@@ -658,23 +741,23 @@
response)
}
}
- .recover {
+ .recoverWith {
+ case h: ContainerHealthError =>
+ Future.failed(h)
case InitializationError(interval, response) =>
- ContainerProxy.constructWhiskActivation(
- job,
- Some(interval),
- interval,
- interval.duration >= actionTimeout,
- response)
+ Future.successful(
+ ContainerProxy
+ .constructWhiskActivation(job, Some(interval), interval, interval.duration >= actionTimeout, response))
case t =>
// Actually, this should never happen - but we want to make sure to not miss a problem
logging.error(this, s"caught unexpected error while running activation: ${t}")
- ContainerProxy.constructWhiskActivation(
- job,
- None,
- Interval.zero,
- false,
- ActivationResponse.whiskError(Messages.abnormalRun))
+ Future.successful(
+ ContainerProxy.constructWhiskActivation(
+ job,
+ None,
+ Interval.zero,
+ false,
+ ActivationResponse.whiskError(Messages.abnormalRun)))
}
val splitAckMessagesPendingLogCollection = collectLogs.logsToBeCollected(job.action)
@@ -755,6 +838,7 @@
}
final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, pauseGrace: FiniteDuration)
+final case class ContainerProxyHealthCheckConfig(enabled: Boolean, checkPeriod: FiniteDuration, maxFails: Int)
object ContainerProxy {
def props(factory: (TransactionId,
@@ -769,9 +853,23 @@
collectLogs: LogsCollector,
instance: InvokerInstanceId,
poolConfig: ContainerPoolConfig,
+ healthCheckConfig: ContainerProxyHealthCheckConfig =
+ loadConfigOrThrow[ContainerProxyHealthCheckConfig](ConfigKeys.containerProxyHealth),
unusedTimeout: FiniteDuration = timeouts.idleContainer,
- pauseGrace: FiniteDuration = timeouts.pauseGrace) =
- Props(new ContainerProxy(factory, ack, store, collectLogs, instance, poolConfig, unusedTimeout, pauseGrace))
+ pauseGrace: FiniteDuration = timeouts.pauseGrace,
+ tcp: Option[ActorRef] = None) =
+ Props(
+ new ContainerProxy(
+ factory,
+ ack,
+ store,
+ collectLogs,
+ instance,
+ poolConfig,
+ healthCheckConfig,
+ unusedTimeout,
+ pauseGrace,
+ tcp))
// Needs to be thread-safe as it's used by multiple proxies concurrently.
private val containerCount = new Counter
@@ -870,6 +968,74 @@
}
}
+object TCPPingClient {
+ def props(tcp: ActorRef, containerId: String, config: ContainerProxyHealthCheckConfig, remote: InetSocketAddress) =
+ Props(new TCPPingClient(tcp, containerId, remote, config))
+}
+
+class TCPPingClient(tcp: ActorRef,
+ containerId: String,
+ remote: InetSocketAddress,
+ config: ContainerProxyHealthCheckConfig)
+ extends Actor {
+ implicit val logging = new AkkaLogging(context.system.log)
+ implicit val ec = context.system.dispatcher
+ implicit var healthPingTx = TransactionId.actionHealthPing
+ case object HealthPingSend
+
+ var scheduledPing: Option[Cancellable] = None
+ var failedCount = 0
+ val addressString = s"${remote.getHostString}:${remote.getPort}"
+ restartPing()
+
+ private def restartPing() = {
+ cancelPing() //just in case restart is called twice
+ scheduledPing = Some(
+ context.system.scheduler.schedule(config.checkPeriod, config.checkPeriod, self, HealthPingSend))
+ }
+ private def cancelPing() = {
+ scheduledPing.foreach(_.cancel())
+ }
+ def receive = {
+ case HealthPingEnabled(enabled) =>
+ if (enabled) {
+ restartPing()
+ } else {
+ cancelPing()
+ }
+ case HealthPingSend =>
+ healthPingTx = TransactionId(systemPrefix + "actionHealth") //reset the tx id each iteration
+ tcp ! Connect(remote)
+ case CommandFailed(_: Connect) =>
+ failedCount += 1
+ if (failedCount == config.maxFails) {
+ logging.error(
+ this,
+ s"Failed health connection to $containerId ($addressString) $failedCount times - exceeded max ${config.maxFails} failures")
+ //destroy this container since we cannot communicate with it
+ context.parent ! FailureMessage(
+ new SocketException(s"Health connection to $containerId ($addressString) failed $failedCount times"))
+ cancelPing()
+ context.stop(self)
+ } else {
+ logging.warn(this, s"Failed health connection to $containerId ($addressString) $failedCount times")
+ }
+
+ case Connected(_, _) =>
+ sender() ! Close
+ if (failedCount > 0) {
+ //reset in case of temp failure
+ logging.info(
+ this,
+ s"Succeeded health connection to $containerId ($addressString) after $failedCount previous failures")
+ failedCount = 0
+ } else {
+ logging.debug(this, s"Succeeded health connection to $containerId ($addressString)")
+ }
+
+ }
+}
+
/** Indicates that something went wrong with an activation and the container should be removed */
trait ActivationError extends Exception {
val activation: WhiskActivation
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
index bca3674..a0163b8 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
@@ -164,7 +164,7 @@
* @param addr the ip of the container
*/
class DockerContainer(protected val id: ContainerId,
- protected val addr: ContainerAddress,
+ protected[core] val addr: ContainerAddress,
protected val useRunc: Boolean)(implicit docker: DockerApiWithFileAccess,
runc: RuncApi,
override protected val as: ActorSystem,
@@ -210,11 +210,13 @@
}
}
- override protected def callContainer(path: String,
- body: JsObject,
- timeout: FiniteDuration,
- maxConcurrent: Int,
- retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
+ override protected def callContainer(
+ path: String,
+ body: JsObject,
+ timeout: FiniteDuration,
+ maxConcurrent: Int,
+ retry: Boolean = false,
+ reschedule: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
val started = Instant.now()
val http = httpConnection.getOrElse {
val conn = if (Container.config.akkaClient) {
@@ -231,7 +233,7 @@
}
http
- .post(path, body, retry)
+ .post(path, body, retry, reschedule)
.flatMap { response =>
val finished = Instant.now()
diff --git a/core/monitoring/user-events/compose/grafana/dashboards/global-metrics.json b/core/monitoring/user-events/compose/grafana/dashboards/global-metrics.json
index d3c06d8..7d5e247 100644
--- a/core/monitoring/user-events/compose/grafana/dashboards/global-metrics.json
+++ b/core/monitoring/user-events/compose/grafana/dashboards/global-metrics.json
@@ -15,6 +15,7 @@
"editable": true,
"gnetId": null,
"graphTooltip": 0,
+ "iteration": 1580164337194,
"links": [],
"panels": [
{
@@ -282,7 +283,9 @@
"recent": false,
"search": true,
"starred": false,
- "tags": ["openwhisk"],
+ "tags": [
+ "openwhisk"
+ ],
"title": "Related Dashboards",
"type": "dashlist"
},
@@ -557,6 +560,177 @@
"align": false,
"alignLevel": null
}
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "fill": 1,
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 19
+ },
+ "id": 13,
+ "legend": {
+ "avg": false,
+ "current": false,
+ "max": false,
+ "min": false,
+ "show": true,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 2,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "sum(increase(counter_invoker_containerStart_counter_total{containerState!=\"warmed\"}[$interval])) by (containerState)",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 1,
+ "legendFormat": "{{containerState}}",
+ "refId": "A"
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeRegions": [],
+ "timeShift": null,
+ "title": "Non-warm container starts [$interval]",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "fill": 1,
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 19
+ },
+ "id": 14,
+ "interval": "",
+ "legend": {
+ "avg": false,
+ "current": false,
+ "max": false,
+ "min": false,
+ "show": true,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 2,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "sum(increase(counter_invoker_containerStart_counter_total{containerState=\"warmed\"}[$interval])) by (containerState)",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 1,
+ "legendFormat": "{{ containerState }}",
+ "refId": "A"
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeRegions": [],
+ "timeShift": null,
+ "title": "Warm container starts [$interval]",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
}
],
"refresh": false,
diff --git a/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json b/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json
index 389d2e5..9962218 100644
--- a/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json
+++ b/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json
@@ -1,46 +1,4 @@
{
- "__inputs": [
- {
- "name": "DS_PROMETHEUS",
- "label": "Prometheus",
- "description": "",
- "type": "datasource",
- "pluginId": "prometheus",
- "pluginName": "Prometheus"
- }
- ],
- "__requires": [
- {
- "type": "grafana",
- "id": "grafana",
- "name": "Grafana",
- "version": "6.1.6"
- },
- {
- "type": "panel",
- "id": "graph",
- "name": "Graph",
- "version": ""
- },
- {
- "type": "datasource",
- "id": "prometheus",
- "name": "Prometheus",
- "version": "1.0.0"
- },
- {
- "type": "panel",
- "id": "singlestat",
- "name": "Singlestat",
- "version": ""
- },
- {
- "type": "panel",
- "id": "table",
- "name": "Table",
- "version": ""
- }
- ],
"annotations": {
"list": [
{
@@ -58,8 +16,7 @@
"editable": true,
"gnetId": 9564,
"graphTooltip": 0,
- "id": null,
- "iteration": 1574127067913,
+ "iteration": 1580164808936,
"links": [],
"panels": [
{
@@ -1476,6 +1433,174 @@
"align": false,
"alignLevel": null
}
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "fill": 1,
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 47
+ },
+ "id": 41,
+ "legend": {
+ "avg": false,
+ "current": false,
+ "max": false,
+ "min": false,
+ "show": true,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 2,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "sum(increase(counter_invoker_containerStart_counter_total{containerState!=\"warmed\",namespace=~\"$namespace\",action=~\"$action\",initiator=~\"$initiator\",region=~\"$region\",stack=~\"$stack\"}[$interval])) by (containerState)",
+ "format": "time_series",
+ "intervalFactor": 1,
+ "legendFormat": "{{containerState}}",
+ "refId": "A"
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeRegions": [],
+ "timeShift": null,
+ "title": "Non-warm container starts [$interval]",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "fill": 1,
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 47
+ },
+ "id": 42,
+ "legend": {
+ "avg": false,
+ "current": false,
+ "max": false,
+ "min": false,
+ "show": true,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 2,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "sum(increase(counter_invoker_containerStart_counter_total{containerState=\"warmed\",namespace=~\"$namespace\",action=~\"$action\",initiator=~\"$initiator\",region=~\"$region\",stack=~\"$stack\"}[$interval])) by (containerState)",
+ "format": "time_series",
+ "intervalFactor": 1,
+ "legendFormat": "{{containerState}}",
+ "refId": "A"
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeRegions": [],
+ "timeShift": null,
+ "title": "Warm container starts [$interval]",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
}
],
"refresh": "5s",
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
index 3c6b14e..c2da266 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
@@ -39,6 +39,7 @@
import spray.json.JsObject
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.containerpool.AkkaContainerClient
+import org.apache.openwhisk.core.containerpool.ContainerHealthError
import org.apache.openwhisk.core.entity.ActivationResponse._
import org.apache.openwhisk.core.entity.size._
@@ -143,6 +144,14 @@
waited should be < (timeout * 2).toMillis
}
+ it should "throw ContainerHealthError on HttpHostConnectException if reschedule==true" in {
+ val timeout = 5.seconds
+ val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 100)
+ assertThrows[ContainerHealthError] {
+ Await.result(connection.post("/run", JsObject.empty, retry = false, reschedule = true), 10.seconds)
+ }
+ }
+
it should "retry till success within timeout limit" in {
val timeout = 5.seconds
val retryInterval = 500.milliseconds
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
index 4675953..eee65a3 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
@@ -39,6 +39,7 @@
import scala.concurrent.Await
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.containerpool.ApacheBlockingContainerClient
+import org.apache.openwhisk.core.containerpool.ContainerHealthError
import org.apache.openwhisk.core.containerpool.RetryableConnectionError
import org.apache.openwhisk.core.entity.ActivationResponse.Timeout
import org.apache.openwhisk.core.entity.size._
@@ -138,6 +139,15 @@
waited should be < (timeout * 2).toMillis
}
+ it should "throw ContainerHealthError on HttpHostConnectException if reschedule==true" in {
+ val timeout = 5.seconds
+ val badHostAndPort = "0.0.0.0:12345"
+ val connection = new ApacheBlockingContainerClient(badHostAndPort, timeout, 1.B)
+ assertThrows[ContainerHealthError] {
+ Await.result(connection.post("/run", JsObject.empty, retry = false, reschedule = true), 10.seconds)
+ }
+ }
+
it should "not truncate responses within limit" in {
val timeout = 1.minute.toMillis
val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis, 50.B)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
index 624c544..2c25eac 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -112,7 +112,8 @@
body: JsObject,
timeout: FiniteDuration,
concurrent: Int,
- retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
+ retry: Boolean = false,
+ reschedule: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
ccRes
}
override protected val logCollectingIdleTimeout = awaitLogs
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index db806b9..c580b5d 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -108,7 +108,8 @@
body: JsObject,
timeout: FiniteDuration,
concurrent: Int,
- retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
+ retry: Boolean = false,
+ reschedule: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
ccRes
}
override protected val waitForLogs = awaitLogs
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
index 5a7b22c..4efbe85 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
@@ -110,11 +110,11 @@
/** Helper to create PreWarmedData */
def preWarmedData(kind: String, memoryLimit: ByteSize = memoryLimit) =
- PreWarmedData(stub[Container], kind, memoryLimit)
+ PreWarmedData(stub[MockableContainer], kind, memoryLimit)
/** Helper to create WarmedData */
def warmedData(run: Run, lastUsed: Instant = Instant.now) = {
- WarmedData(stub[Container], run.msg.user.namespace.name, run.action, lastUsed)
+ WarmedData(stub[MockableContainer], run.msg.user.namespace.name, run.action, lastUsed)
}
/** Creates a sequence of containers and a factory returning this sequence. */
@@ -741,6 +741,29 @@
pool ! runMessageConcurrent
containers(0).expectMsg(runMessageConcurrent)
}
+
+ it should "backfill prewarms when prewarm containers are removed" in {
+ val (containers, factory) = testContainers(6)
+ val feed = TestProbe()
+
+ val pool =
+ system.actorOf(ContainerPool
+ .props(factory, poolConfig(MemoryLimit.STD_MEMORY * 5), feed.ref, List(PrewarmingConfig(2, exec, memoryLimit))))
+ containers(0).expectMsg(Start(exec, memoryLimit))
+ containers(1).expectMsg(Start(exec, memoryLimit))
+
+ //removing 2 prewarm containers will start 2 containers via backfill
+ containers(0).send(pool, ContainerRemoved)
+ containers(1).send(pool, ContainerRemoved)
+ containers(2).expectMsg(Start(exec, memoryLimit))
+ containers(3).expectMsg(Start(exec, memoryLimit))
+ //make sure extra prewarms are not started
+ containers(4).expectNoMessage(100.milliseconds)
+ containers(5).expectNoMessage(100.milliseconds)
+ }
+}
+abstract class MockableContainer extends Container {
+ protected[core] val addr: ContainerAddress = ContainerAddress("nohost")
}
/**
@@ -765,14 +788,14 @@
namespace: String = standardNamespace.asString,
lastUsed: Instant = Instant.now,
active: Int = 0) =
- WarmedData(stub[Container], EntityName(namespace), action, lastUsed, active)
+ WarmedData(stub[MockableContainer], EntityName(namespace), action, lastUsed, active)
/** Helper to create WarmingData with sensible defaults */
def warmingData(action: ExecutableWhiskAction = createAction(),
namespace: String = standardNamespace.asString,
lastUsed: Instant = Instant.now,
active: Int = 0) =
- WarmingData(stub[Container], EntityName(namespace), action, lastUsed, active)
+ WarmingData(stub[MockableContainer], EntityName(namespace), action, lastUsed, active)
/** Helper to create WarmingData with sensible defaults */
def warmingColdData(action: ExecutableWhiskAction = createAction(),
@@ -782,7 +805,7 @@
WarmingColdData(EntityName(namespace), action, lastUsed, active)
/** Helper to create PreWarmedData with sensible defaults */
- def preWarmedData(kind: String = "anyKind") = PreWarmedData(stub[Container], kind, 256.MB)
+ def preWarmedData(kind: String = "anyKind") = PreWarmedData(stub[MockableContainer], kind, 256.MB)
/** Helper to create NoData */
def noData() = NoData()
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 464be2d..5666555 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -17,18 +17,19 @@
package org.apache.openwhisk.core.containerpool.test
+import java.net.InetSocketAddress
import java.time.Instant
import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
import akka.actor.{ActorRef, ActorSystem, FSM}
import akka.stream.scaladsl.Source
-import akka.testkit.CallingThreadDispatcher
-import akka.testkit.{ImplicitSender, TestKit}
+import akka.testkit.{CallingThreadDispatcher, ImplicitSender, TestKit, TestProbe}
import akka.util.ByteString
import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction, WhiskProperties}
import java.time.temporal.ChronoUnit
import java.util.concurrent.atomic.AtomicInteger
+import akka.io.Tcp.{Close, CommandFailed, Connect, Connected}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
@@ -161,7 +162,7 @@
def expectWarmed(namespace: String, action: ExecutableWhiskAction) = {
val test = EntityName(namespace)
expectMsgPF() {
- case a @ NeedWork(WarmedData(_, `test`, `action`, _, _)) => //matched, otherwise will fail
+ case a @ NeedWork(WarmedData(_, `test`, `action`, _, _, _)) => //matched, otherwise will fail
}
}
@@ -273,6 +274,7 @@
Future.successful(())
}
val poolConfig = ContainerPoolConfig(2.MB, 0.5, false)
+ def healthchecksConfig(enabled: Boolean = false) = ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2)
val filterEnvVar = (k: String) => Character.isUpperCase(k.charAt(0))
behavior of "ContainerProxy"
@@ -309,6 +311,7 @@
createCollector(),
InvokerInstanceId(0, Some("myname"), userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
preWarm(machine)
@@ -338,6 +341,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
@@ -382,6 +386,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
preWarm(machine)
@@ -445,6 +450,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
preWarm(machine)
@@ -495,6 +501,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
run(machine, Uninitialized)
@@ -535,6 +542,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
@@ -554,6 +562,110 @@
}
}
+ it should "resend a failed Run when it is first Run after Ready state" in within(timeout) {
+ val noLogsAction = action.copy(limits = ActionLimits(logs = LogLimit(0.MB)))
+ val container = new TestContainer {
+ override def run(
+ parameters: JsObject,
+ environment: JsObject,
+ timeout: FiniteDuration,
+ concurrent: Int,
+ reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+ atomicRunCount.incrementAndGet()
+ //every run after first fails
+ if (runCount > 1) {
+ Future.failed(ContainerHealthError(messageTransId, "intentional failure"))
+ } else {
+ Future.successful((runInterval, ActivationResponse.success()))
+ }
+ }
+ }
+ val factory = createFactory(Future.successful(container))
+ val acker = createAcker(noLogsAction)
+ val store = createStore
+ val collector = createCollector()
+
+ val machine =
+ childActorOf(
+ ContainerProxy
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ healthchecksConfig(),
+ pauseGrace = pauseGrace))
+ registerCallback(machine)
+
+ machine ! Run(noLogsAction, message)
+ expectMsg(Transition(machine, Uninitialized, Running))
+ expectWarmed(invocationNamespace.name, noLogsAction)
+ expectMsg(Transition(machine, Running, Ready))
+
+ val failingRun = Run(noLogsAction, message)
+ val runAfterFail = Run(noLogsAction, message)
+ //should fail and retry
+ machine ! failingRun
+ machine ! runAfterFail //will be buffered first, and then retried
+ expectMsg(Transition(machine, Ready, Running))
+ //on failure, buffered are resent first
+ expectMsg(runAfterFail)
+ //resend the first run to parent, and start removal process
+ expectMsg(RescheduleJob)
+ expectMsg(Transition(machine, Running, Removing))
+ expectMsg(failingRun)
+ expectNoMessage(100.milliseconds)
+
+ awaitAssert {
+ factory.calls should have size 1
+ container.initializeCount shouldBe 1
+ container.runCount shouldBe 2
+ collector.calls should have size 0
+ acker.calls should have size 1
+ store.calls should have size 1
+ acker.calls.head._6 shouldBe a[CompletionMessage]
+ }
+ }
+
+ it should "start tcp ping to containers when action healthcheck enabled" in within(timeout) {
+ val noLogsAction = action.copy(limits = ActionLimits(logs = LogLimit(0.MB)))
+ val container = new TestContainer()
+ val factory = createFactory(Future.successful(container))
+ val acker = createAcker(noLogsAction)
+ val store = createStore
+ val collector = createCollector()
+ val tcpProbe = TestProbe()
+ val healthchecks = healthchecksConfig(true)
+ val machine =
+ childActorOf(
+ ContainerProxy
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ healthchecks,
+ pauseGrace = pauseGrace,
+ tcp = Some(tcpProbe.ref)))
+ registerCallback(machine)
+ preWarm(machine)
+
+ tcpProbe.expectMsg(Connect(new InetSocketAddress("0.0.0.0", 8080)))
+ tcpProbe.expectMsg(Connect(new InetSocketAddress("0.0.0.0", 8080)))
+ tcpProbe.expectMsg(Connect(new InetSocketAddress("0.0.0.0", 8080)))
+ //pings should repeat till the container goes into Running state
+ run(machine, Started)
+ tcpProbe.expectNoMessage(healthchecks.checkPeriod + 100.milliseconds)
+
+ awaitAssert {
+ factory.calls should have size 1
+ }
+ }
+
it should "respond with CombinedCompletionAndResultMessage for blocking invocation with no logs" in within(timeout) {
val noLogsAction = action.copy(limits = ActionLimits(logs = LogLimit(0.MB)))
val blockingMessage = message.copy(blocking = true)
@@ -663,6 +775,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
(factory, container, acker, store, collector, machine)
@@ -706,6 +819,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace)
.withDispatcher(CallingThreadDispatcher.Id))
registerCallback(machine)
@@ -801,8 +915,12 @@
it should "complete the transaction and reuse the container on a failed run IFF failure was applicationError" in within(
timeout) {
val container = new TestContainer {
- override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration, concurrent: Int)(
- implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+ override def run(
+ parameters: JsObject,
+ environment: JsObject,
+ timeout: FiniteDuration,
+ concurrent: Int,
+ reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
atomicRunCount.incrementAndGet()
//every other run fails
if (runCount % 2 == 0) {
@@ -827,6 +945,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -900,6 +1019,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
machine ! Run(action, message)
@@ -944,6 +1064,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
machine ! Run(action, message)
@@ -971,8 +1092,12 @@
it should "complete the transaction and destroy the container on a failed run IFF failure was containerError" in within(
timeout) {
val container = new TestContainer {
- override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration, concurrent: Int)(
- implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+ override def run(
+ parameters: JsObject,
+ environment: JsObject,
+ timeout: FiniteDuration,
+ concurrent: Int,
+ reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
atomicRunCount.incrementAndGet()
Future.successful((initInterval, ActivationResponse.developerError(("boom"))))
}
@@ -992,6 +1117,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
machine ! Run(action, message)
@@ -1030,6 +1156,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
machine ! Run(action, message)
@@ -1067,6 +1194,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
machine ! Run(action, message)
@@ -1108,6 +1236,7 @@
createCollector(),
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
run(machine, Uninitialized) // first run an activation
@@ -1130,6 +1259,145 @@
}
}
+ it should "resend the job to the parent if /run fails connection after Paused -> Running" in within(timeout) {
+ val container = new TestContainer {
+ override def run(
+ parameters: JsObject,
+ environment: JsObject,
+ timeout: FiniteDuration,
+ concurrent: Int,
+ reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+
+ if (reschedule) {
+ throw ContainerHealthError(transid, "reconnect failed to xyz")
+ }
+ super.run(parameters, environment, timeout, concurrent, reschedule)
+ }
+ }
+ val factory = createFactory(Future.successful(container))
+ val acker = createAcker()
+ val store = createStore
+
+ val machine =
+ childActorOf(
+ ContainerProxy
+ .props(
+ factory,
+ acker,
+ store,
+ createCollector(),
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ healthchecksConfig(),
+ pauseGrace = pauseGrace))
+ registerCallback(machine)
+ run(machine, Uninitialized) // first run an activation
+ timeout(machine) // times out Ready state so container suspends
+ expectPause(machine)
+
+ val runMessage = Run(action, message)
+ machine ! runMessage
+ expectMsg(Transition(machine, Paused, Running))
+ expectMsg(RescheduleJob)
+ expectMsg(Transition(machine, Running, Removing))
+ expectMsg(runMessage)
+
+ awaitAssert {
+ factory.calls should have size 1
+ container.runCount shouldBe 1
+ container.suspendCount shouldBe 1
+ container.resumeCount shouldBe 1
+ container.destroyCount shouldBe 1
+ }
+ }
+
+ it should "resend the job to the parent if /run fails connection after Ready -> Running" in within(timeout) {
+ val container = new TestContainer {
+ override def run(
+ parameters: JsObject,
+ environment: JsObject,
+ timeout: FiniteDuration,
+ concurrent: Int,
+ reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+
+ if (reschedule) {
+ throw ContainerHealthError(transid, "reconnect failed to xyz")
+ }
+ super.run(parameters, environment, timeout, concurrent, reschedule)
+ }
+ }
+ val factory = createFactory(Future.successful(container))
+ val acker = createAcker()
+ val store = createStore
+
+ val machine =
+ childActorOf(
+ ContainerProxy
+ .props(
+ factory,
+ acker,
+ store,
+ createCollector(),
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ healthchecksConfig(),
+ pauseGrace = pauseGrace))
+ registerCallback(machine)
+ run(machine, Uninitialized) // first run an activation
+ //will be in Ready state now
+
+ val runMessage = Run(action, message)
+ machine ! runMessage
+ expectMsg(Transition(machine, Ready, Running))
+ expectMsg(RescheduleJob)
+ expectMsg(Transition(machine, Running, Removing))
+ expectMsg(runMessage)
+
+ awaitAssert {
+ factory.calls should have size 1
+ container.runCount shouldBe 1
+ container.suspendCount shouldBe 0
+ container.resumeCount shouldBe 0
+ container.destroyCount shouldBe 1
+ }
+ }
+
+ it should "remove and replace a prewarm container if it fails healthcheck after startup" in within(timeout) {
+ val container = new TestContainer
+ val factory = createFactory(Future.successful(container))
+ val acker = createAcker()
+ val store = createStore
+ val collector = createCollector()
+
+ val machine =
+ childActorOf(
+ ContainerProxy
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ healthchecksConfig(true),
+ pauseGrace = pauseGrace))
+ registerCallback(machine)
+ preWarm(machine)
+
+ //expect failure after healthchecks fail
+ expectMsg(ContainerRemoved)
+ expectMsg(Transition(machine, Started, Removing))
+
+ awaitAssert {
+ factory.calls should have size 1
+ container.initializeCount shouldBe 0
+ container.runCount shouldBe 0
+ collector.calls should have size 0
+ container.suspendCount shouldBe 0
+ container.resumeCount shouldBe 0
+ acker.calls should have size 0
+ }
+ }
it should "remove the container if suspend fails" in within(timeout) {
val container = new TestContainer {
override def suspend()(implicit transid: TransactionId) = {
@@ -1151,6 +1419,7 @@
createCollector(),
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
run(machine, Uninitialized)
@@ -1196,6 +1465,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
@@ -1255,6 +1525,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
run(machine, Uninitialized)
@@ -1303,6 +1574,7 @@
collector,
InvokerInstanceId(0, userMemory = defaultUserMemory),
poolConfig,
+ healthchecksConfig(),
pauseGrace = pauseGrace))
registerCallback(machine)
@@ -1377,7 +1649,7 @@
initialCount)
val nextWarmedData = warmedData.nextRun(Run(action, message))
nextWarmedData should matchPattern {
- case WarmedData(pwData.container, message.user.namespace.name, action, _, newCount) =>
+ case WarmedData(pwData.container, message.user.namespace.name, action, _, newCount, _) =>
}
warmedData.lastUsed.until(nextWarmedData.lastUsed, ChronoUnit.SECONDS) should be >= timeDiffSeconds.toLong
}
@@ -1390,7 +1662,7 @@
apiKeyMustBePresent: Boolean = true)
extends Container {
protected[core] val id = ContainerId("testcontainer")
- protected val addr = ContainerAddress("0.0.0.0")
+ protected[core] val addr = ContainerAddress("0.0.0.0")
protected implicit val logging: Logging = log
protected implicit val ec: ExecutionContext = system.dispatcher
override implicit protected val as: ActorSystem = system
@@ -1458,8 +1730,12 @@
initPromise.map(_.future).getOrElse(Future.successful(initInterval))
}
- override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration, concurrent: Int)(
- implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+ override def run(
+ parameters: JsObject,
+ environment: JsObject,
+ timeout: FiniteDuration,
+ concurrent: Int,
+ reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
// the "init" arguments are not passed on run
parameters shouldBe JsObject(activationArguments.fields.filter(k => !filterEnvVar(k._1)))
@@ -1496,3 +1772,66 @@
}
}
}
+@RunWith(classOf[JUnitRunner])
+class TCPPingClientTests extends TestKit(ActorSystem("TCPPingClient")) with Matchers with FlatSpecLike {
+ val config = ContainerProxyHealthCheckConfig(true, 200.milliseconds, 2)
+ val addr = new InetSocketAddress("1.2.3.4", 12345)
+ val localAddr = new InetSocketAddress("localhost", 5432)
+
+ behavior of "TCPPingClient"
+ it should "start the ping on HealthPingEnabled(true) and stop on HealthPingEnabled(false)" in {
+ val tcpProbe = TestProbe()
+ val pingClient = system.actorOf(TCPPingClient.props(tcpProbe.ref, "1234", config, addr))
+ pingClient ! HealthPingEnabled(true)
+ tcpProbe.expectMsg(Connect(addr))
+ //measure the delay between connections
+ val start = System.currentTimeMillis()
+ tcpProbe.expectMsg(Connect(addr))
+ val delay = System.currentTimeMillis() - start
+ delay should be > config.checkPeriod.toMillis - 25 //allow 25ms slop
+ tcpProbe.expectMsg(Connect(addr))
+ //make sure disable works
+ pingClient ! HealthPingEnabled(false)
+ //make sure no Connect msg for at least the check period
+ tcpProbe.expectNoMessage(config.checkPeriod)
+ }
+ it should "send FailureMessage and cancel the ping on CommandFailed" in {
+ val tcpProbe = TestProbe()
+ val pingClient = system.actorOf(TCPPingClient.props(tcpProbe.ref, "1234", config, addr))
+ val clientProbe = TestProbe()
+ clientProbe watch pingClient
+ pingClient ! HealthPingEnabled(true)
+ val c = Connect(addr)
+ //send config.maxFails CommandFailed messages
+ (1 to config.maxFails).foreach { _ =>
+ tcpProbe.expectMsg(c)
+ pingClient ! CommandFailed(c)
+ }
+ //now we expect termination
+ clientProbe.expectTerminated(pingClient)
+ }
+ it should "reset failedCount on Connected" in {
+ val tcpProbe = TestProbe()
+ val pingClient = system.actorOf(TCPPingClient.props(tcpProbe.ref, "1234", config, addr))
+ val clientProbe = TestProbe()
+ clientProbe watch pingClient
+ pingClient ! HealthPingEnabled(true)
+ val c = Connect(addr)
+ //send maxFails-1 (should not fail)
+ (1 to config.maxFails - 1).foreach { _ =>
+ tcpProbe.expectMsg(c)
+ pingClient ! CommandFailed(c)
+ }
+ tcpProbe.expectMsg(c)
+ tcpProbe.send(pingClient, Connected(addr, localAddr))
+ //counter should be reset
+ tcpProbe.expectMsg(Close)
+ //send maxFails (will fail, but counter is reset so we get maxFails tries)
+ (1 to config.maxFails).foreach { _ =>
+ tcpProbe.expectMsg(c)
+ pingClient ! CommandFailed(c)
+ }
+ //now we expect termination
+ clientProbe.expectTerminated(pingClient)
+ }
+}