Add healthcheck for Invoker -> Action Container  (#4698)

Validate prewarm containers are reachable before use; for paused containers, connection failures on first activation after resume causes reschedule to ContainerPool

Co-authored-by: Cosmin Stanciu <selfxp@users.noreply.github.com>
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 404e0c1..37591be 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -474,8 +474,23 @@
     LogMarkerToken(invoker, "mesos", start, Some(cmd), Map("cmd" -> cmd))(MeasurementUnit.time.milliseconds)
   def INVOKER_MESOS_CMD_TIMEOUT(cmd: String) =
     LogMarkerToken(invoker, "mesos", timeout, Some(cmd), Map("cmd" -> cmd))(MeasurementUnit.none)
-  def INVOKER_CONTAINER_START(containerState: String) =
-    LogMarkerToken(invoker, "containerStart", counter, Some(containerState), Map("containerState" -> containerState))(
+  def INVOKER_CONTAINER_START(containerState: String, invocationNamespace: String, namespace: String, action: String) =
+    LogMarkerToken(
+      invoker,
+      "containerStart",
+      counter,
+      Some(containerState),
+      Map(
+        "containerState" -> containerState,
+        "initiator" -> invocationNamespace,
+        "namespace" -> namespace,
+        "action" -> action))(MeasurementUnit.none)
+  val INVOKER_CONTAINER_HEALTH = LogMarkerToken(invoker, "containerHealth", start)(MeasurementUnit.time.milliseconds)
+  val INVOKER_CONTAINER_HEALTH_FAILED_WARM =
+    LogMarkerToken(invoker, "containerHealthFailed", counter, Some("warm"), Map("containerState" -> "warm"))(
+      MeasurementUnit.none)
+  val INVOKER_CONTAINER_HEALTH_FAILED_PREWARM =
+    LogMarkerToken(invoker, "containerHealthFailed", counter, Some("prewarm"), Map("containerState" -> "prewarm"))(
       MeasurementUnit.none)
   val CONTAINER_CLIENT_RETRIES =
     LogMarkerToken(containerClient, "retries", counter)(MeasurementUnit.none)
@@ -494,6 +509,10 @@
     LogMarkerToken(containerPool, "prewarmCount", counter)(MeasurementUnit.none)
   val CONTAINER_POOL_PREWARM_SIZE =
     LogMarkerToken(containerPool, "prewarmSize", counter)(MeasurementUnit.information.megabytes)
+  val CONTAINER_POOL_IDLES_COUNT =
+    LogMarkerToken(containerPool, "idlesCount", counter)(MeasurementUnit.none)
+  val CONTAINER_POOL_IDLES_SIZE =
+    LogMarkerToken(containerPool, "idlesSize", counter)(MeasurementUnit.information.megabytes)
 
   val INVOKER_TOTALMEM_BLACKBOX = LogMarkerToken(loadbalancer, "totalCapacityBlackBox", counter)(MeasurementUnit.none)
   val INVOKER_TOTALMEM_MANAGED = LogMarkerToken(loadbalancer, "totalCapacityManaged", counter)(MeasurementUnit.none)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
index 85951a5..6fcc566 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
@@ -225,6 +225,7 @@
   val invokerHealth = TransactionId(systemPrefix + "invokerHealth") // Invoker supervision
   val controller = TransactionId(systemPrefix + "controller") // Controller startup
   val dbBatcher = TransactionId(systemPrefix + "dbBatcher") // Database batcher
+  val actionHealthPing = TransactionId(systemPrefix + "actionHealth")
 
   def apply(tid: String, extraLogging: Boolean = false): TransactionId = {
     val now = Instant.now(Clock.systemUTC()).inMills
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index b6fa765..62a4c98 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -247,6 +247,7 @@
 
   val containerProxy = "whisk.container-proxy"
   val containerProxyTimeouts = s"$containerProxy.timeouts"
+  val containerProxyHealth = s"$containerProxy.action-health-check"
 
   val s3 = "whisk.s3"
   val query = "whisk.query-limit"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
index 8eaab12..3dea6db 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
@@ -87,9 +87,10 @@
    * @param endpoint the path the api call relative to hostname
    * @param body the JSON value to post (this is usually a JSON objecT)
    * @param retry whether or not to retry on connection failure
+   * @param reschedule whether or not to throw ContainerHealthError (triggers reschedule) on connection failure
    * @return Left(Error Message) or Right(Status Code, Response as UTF-8 String)
    */
-  def post(endpoint: String, body: JsValue, retry: Boolean)(
+  def post(endpoint: String, body: JsValue, retry: Boolean, reschedule: Boolean = false)(
     implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = {
 
     //create the request
@@ -98,7 +99,7 @@
         .withHeaders(Accept(MediaTypes.`application/json`))
     }
 
-    retryingRequest(req, timeout, retry)
+    retryingRequest(req, timeout, retry, reschedule, endpoint)
       .flatMap {
         case (response, retries) => {
           if (retries > 0) {
@@ -126,26 +127,35 @@
           }
         }
       }
-      .recover {
-        case t: TimeoutException => Left(Timeout(t))
-        case NonFatal(t)         => Left(ConnectionError(t))
+      .recoverWith {
+        case t: TimeoutException =>
+          Future.successful(Left(Timeout(t)))
+        case t: ContainerHealthError =>
+          //propagate as a failed future; clients can retry at a different container
+          Future.failed(t)
+        case NonFatal(t) =>
+          Future.successful(Left(ConnectionError(t)))
       }
   }
   //returns a Future HttpResponse -> Int (where Int is the retryCount)
   private def retryingRequest(req: Future[HttpRequest],
                               timeout: FiniteDuration,
                               retry: Boolean,
-                              retryCount: Int = 0): Future[(HttpResponse, Int)] = {
+                              reschedule: Boolean,
+                              endpoint: String,
+                              retryCount: Int = 0)(implicit tid: TransactionId): Future[(HttpResponse, Int)] = {
     val start = Instant.now
 
     request(req)
       .map((_, retryCount))
       .recoverWith {
+        case _: StreamTcpException if reschedule =>
+          Future.failed(ContainerHealthError(tid, endpoint))
         case t: StreamTcpException if retry =>
           if (timeout > Duration.Zero) {
             akka.pattern.after(retryInterval, as.scheduler)({
               val newTimeout = timeout - (Instant.now.toEpochMilli - start.toEpochMilli).milliseconds
-              retryingRequest(req, newTimeout, retry, retryCount + 1)
+              retryingRequest(req, newTimeout, retry, reschedule, endpoint, retryCount + 1)
             })
           } else {
             logging.warn(
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
index b4452a8..ecfda42 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
@@ -90,7 +90,7 @@
    * @param retry whether or not to retry on connection failure
    * @return Left(Error Message) or Right(Status Code, Response as UTF-8 String)
    */
-  def post(endpoint: String, body: JsValue, retry: Boolean)(
+  def post(endpoint: String, body: JsValue, retry: Boolean, reschedule: Boolean = false)(
     implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = {
     val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8)
     entity.setContentType("application/json")
@@ -101,14 +101,18 @@
 
     Future {
       blocking {
-        execute(request, timeout, maxConcurrent, retry)
+        execute(request, timeout, maxConcurrent, retry, reschedule)
       }
     }
   }
 
   // Annotation will make the compiler complain if no tail recursion is possible
-  @tailrec private def execute(request: HttpRequestBase, timeout: FiniteDuration, maxConcurrent: Int, retry: Boolean)(
-    implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = {
+  @tailrec private def execute(
+    request: HttpRequestBase,
+    timeout: FiniteDuration,
+    maxConcurrent: Int,
+    retry: Boolean,
+    reschedule: Boolean = false)(implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = {
     val start = Instant.now
 
     Try(connection.execute(request)).map { response =>
@@ -158,7 +162,10 @@
       case t: NoHttpResponseException if ApacheBlockingContainerClient.clientConfig.retryNoHttpResponseException =>
         Failure(RetryableConnectionError(t))
     } match {
-      case Success(response) => response
+      case Success(response)                                  => response
+      case Failure(_: RetryableConnectionError) if reschedule =>
+        //propagate as a failed future; clients can retry at a different container
+        throw ContainerHealthError(tid, request.getURI.toString)
       case Failure(t: RetryableConnectionError) if retry =>
         if (timeout > Duration.Zero) {
           Thread.sleep(50) // Sleep for 50 milliseconds
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
index 59c0804..20627fb 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
@@ -48,6 +48,7 @@
 }
 case class ContainerAddress(host: String, port: Int = 8080) {
   require(host.nonEmpty, "ContainerIp must not be empty")
+  def asString() = s"${host}:${port}"
 }
 
 object Container {
@@ -72,7 +73,7 @@
 
   implicit protected val as: ActorSystem
   protected val id: ContainerId
-  protected val addr: ContainerAddress
+  protected[core] val addr: ContainerAddress
   protected implicit val logging: Logging
   protected implicit val ec: ExecutionContext
 
@@ -149,8 +150,11 @@
   }
 
   /** Runs code in the container. Thread-safe - caller may invoke concurrently for concurrent activation processing. */
-  def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration, maxConcurrent: Int)(
-    implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+  def run(parameters: JsObject,
+          environment: JsObject,
+          timeout: FiniteDuration,
+          maxConcurrent: Int,
+          reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
     val actionName = environment.fields.get("action_name").map(_.convertTo[String]).getOrElse("")
     val start =
       transid.started(
@@ -161,7 +165,7 @@
 
     val parameterWrapper = JsObject("value" -> parameters)
     val body = JsObject(parameterWrapper.fields ++ environment.fields)
-    callContainer("/run", body, timeout, maxConcurrent, retry = false)
+    callContainer("/run", body, timeout, maxConcurrent, retry = false, reschedule)
       .andThen { // never fails
         case Success(r: RunResult) =>
           transid.finished(
@@ -193,12 +197,14 @@
    * @param body body to send
    * @param timeout timeout of the request
    * @param retry whether or not to retry the request
+   * @param reschedule throw a reschedule error in case of connection failure
    */
   protected def callContainer(path: String,
                               body: JsObject,
                               timeout: FiniteDuration,
                               maxConcurrent: Int,
-                              retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
+                              retry: Boolean = false,
+                              reschedule: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
     val started = Instant.now()
     val http = httpConnection.getOrElse {
       val conn = openConnections(timeout, maxConcurrent)
@@ -206,7 +212,7 @@
       conn
     }
     http
-      .post(path, body, retry)
+      .post(path, body, retry, reschedule)
       .map { response =>
         val finished = Instant.now()
         RunResult(Interval(started, finished), response)
@@ -247,6 +253,9 @@
 /** Indicates an error while initializing a container */
 case class InitializationError(interval: Interval, response: ActivationResponse) extends Exception(response.toString)
 
+/** Indicates a connection error after resuming a container */
+case class ContainerHealthError(tid: TransactionId, msg: String) extends Exception(msg)
+
 case class Interval(start: Instant, end: Instant) {
   def duration = Duration.create(end.toEpochMilli() - start.toEpochMilli(), MILLISECONDS)
 }
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerClient.scala
index 0720a03..597376c 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerClient.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerClient.scala
@@ -24,7 +24,7 @@
 import org.apache.openwhisk.core.entity.ActivationResponse._
 
 trait ContainerClient {
-  def post(endpoint: String, body: JsValue, retry: Boolean)(
+  def post(endpoint: String, body: JsValue, retry: Boolean, reschedule: Boolean)(
     implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]]
   def close(): Future[Unit]
 }
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
index 968f942..a6abec4 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
@@ -188,7 +188,7 @@
 }
 
 class MesosTask(override protected val id: ContainerId,
-                override protected val addr: ContainerAddress,
+                override protected[core] val addr: ContainerAddress,
                 override protected implicit val ec: ExecutionContext,
                 override protected implicit val logging: Logging,
                 override protected val as: ActorSystem,
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNTask.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNTask.scala
index d7fcf1f..714a04e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNTask.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNTask.scala
@@ -43,7 +43,7 @@
  * (external log collection and retrieval must be enabled via LogStore SPI to expose logs to wsk cli)
  */
 class YARNTask(override protected val id: ContainerId,
-               override protected val addr: ContainerAddress,
+               override protected[core] val addr: ContainerAddress,
                override protected val ec: ExecutionContext,
                override protected val logging: Logging,
                override protected val as: ActorSystem,
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index a998ecb..5760a4b 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -122,6 +122,11 @@
       idle-container = 10 minutes
       pause-grace = 50 milliseconds
     }
+    action-health-check {
+      enabled = false # if true, prewarm containers will be pinged periodically and warm containers will be pinged once after resumed
+      check-period = 3 seconds # how often should prewarm containers be pinged (tcp connection attempt)
+      max-fails = 3 # prewarm containers that fail this number of times will be destroyed and replaced
+    }
   }
 
   # tracing configuration
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index ea33006..1e267fe 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -69,6 +69,7 @@
   var freePool = immutable.Map.empty[ActorRef, ContainerData]
   var busyPool = immutable.Map.empty[ActorRef, ContainerData]
   var prewarmedPool = immutable.Map.empty[ActorRef, ContainerData]
+  var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
   // If all memory slots are occupied and if there is currently no container to be removed, than the actions will be
   // buffered here to keep order of computation.
   // Otherwise actions with small memory-limits could block actions with large memory limits.
@@ -79,13 +80,7 @@
   //periodically emit metrics (don't need to do this for each message!)
   context.system.scheduler.schedule(30.seconds, 10.seconds, self, EmitMetrics)
 
-  prewarmConfig.foreach { config =>
-    logging.info(this, s"pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")(
-      TransactionId.invokerWarmup)
-    (1 to config.count).foreach { _ =>
-      prewarmContainer(config.exec, config.memoryLimit)
-    }
-  }
+  backfillPrewarms(true)
 
   def logContainerStart(r: Run, containerState: String, activeActivations: Int, container: Option[Container]): Unit = {
     val namespaceName = r.msg.user.namespace.name
@@ -95,7 +90,11 @@
 
     r.msg.transid.mark(
       this,
-      LoggingMarkers.INVOKER_CONTAINER_START(containerState),
+      LoggingMarkers.INVOKER_CONTAINER_START(
+        containerState,
+        r.msg.user.namespace.toString,
+        r.msg.action.namespace.toString,
+        r.msg.action.name.toString),
       s"containerStart containerState: $containerState container: $container activations: $activeActivations of max $maxConcurrent action: $actionName namespace: $namespaceName activationId: $activationId",
       akka.event.Logging.InfoLevel)
   }
@@ -242,6 +241,7 @@
       processBufferOrFeed()
     // Container is prewarmed and ready to take work
     case NeedWork(data: PreWarmedData) =>
+      prewarmStartingPool = prewarmStartingPool - sender()
       prewarmedPool = prewarmedPool + (sender() -> data)
 
     // Container got removed
@@ -256,6 +256,21 @@
         busyPool = busyPool - sender()
       }
       processBufferOrFeed()
+
+      //in case this was a prewarm
+      prewarmedPool.get(sender()).foreach { _ =>
+        logging.info(this, "failed prewarm removed")
+        prewarmedPool = prewarmedPool - sender()
+      }
+      //in case this was a starting prewarm
+      prewarmStartingPool.get(sender()).foreach { _ =>
+        logging.info(this, "failed starting prewarm removed")
+        prewarmStartingPool = prewarmStartingPool - sender()
+      }
+
+      //backfill prewarms on every ContainerRemoved, just in case
+      backfillPrewarms(false) //in case a prewarm is removed due to health failure or crash
+
     // This message is received for one of these reasons:
     // 1. Container errored while resuming a warm container, could not process the job, and sent the job back
     // 2. The container aged, is destroying itself, and was assigned a job which it had to send back
@@ -287,6 +302,29 @@
     }
   }
 
+  /** Install prewarm containers up to the configured requirements for each kind/memory combination. */
+  def backfillPrewarms(init: Boolean) = {
+    prewarmConfig.foreach { config =>
+      val kind = config.exec.kind
+      val memory = config.memoryLimit
+      val currentCount = prewarmedPool.count {
+        case (_, PreWarmedData(_, `kind`, `memory`, _)) => true //done starting
+        case _                                          => false //started but not finished starting
+      }
+      val startingCount = prewarmStartingPool.count(p => p._2._1 == kind && p._2._2 == memory)
+      val containerCount = currentCount + startingCount
+      if (containerCount < config.count) {
+        logging.info(
+          this,
+          s"found ${currentCount} started and ${startingCount} starting; ${if (init) "initing" else "backfilling"} ${config.count - containerCount} pre-warms to desired count: ${config.count} for kind:${config.exec.kind} mem:${config.memoryLimit.toString}")(
+          TransactionId.invokerWarmup)
+        (containerCount until config.count).foreach { _ =>
+          prewarmContainer(config.exec, config.memoryLimit)
+        }
+      }
+    }
+  }
+
   /** Creates a new container and updates state accordingly. */
   def createContainer(memoryLimit: ByteSize): (ActorRef, ContainerData) = {
     val ref = childFactory(context)
@@ -296,8 +334,11 @@
   }
 
   /** Creates a new prewarmed container */
-  def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize): Unit =
-    childFactory(context) ! Start(exec, memoryLimit)
+  def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize): Unit = {
+    val newContainer = childFactory(context)
+    prewarmStartingPool = prewarmStartingPool + (newContainer -> (exec.kind, memoryLimit))
+    newContainer ! Start(exec, memoryLimit)
+  }
 
   /**
    * Takes a prewarm container out of the prewarmed pool
@@ -362,6 +403,10 @@
     MetricEmitter.emitGaugeMetric(
       LoggingMarkers.CONTAINER_POOL_PREWARM_SIZE,
       prewarmedPool.map(_._2.memoryLimit.toMB).sum)
+    val unused = freePool.filter(_._2.activeActivationCount == 0)
+    val unusedMB = unused.map(_._2.memoryLimit.toMB).sum
+    MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_IDLES_COUNT, unused.size)
+    MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_IDLES_SIZE, unusedMB)
   }
 }
 
@@ -397,8 +442,8 @@
                                            idles: Map[A, ContainerData]): Option[(A, ContainerData)] = {
     idles
       .find {
-        case (_, c @ WarmedData(_, `invocationNamespace`, `action`, _, _)) if c.hasCapacity() => true
-        case _                                                                                => false
+        case (_, c @ WarmedData(_, `invocationNamespace`, `action`, _, _, _)) if c.hasCapacity() => true
+        case _                                                                                   => false
       }
       .orElse {
         idles.find {
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 50024fd..a35d999 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -17,15 +17,28 @@
 
 package org.apache.openwhisk.core.containerpool
 
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.Cancellable
 import java.time.Instant
-
 import akka.actor.Status.{Failure => FailureMessage}
 import akka.actor.{FSM, Props, Stash}
 import akka.event.Logging.InfoLevel
+import akka.io.IO
+import akka.io.Tcp
+import akka.io.Tcp.Close
+import akka.io.Tcp.CommandFailed
+import akka.io.Tcp.Connect
+import akka.io.Tcp.Connected
 import akka.pattern.pipe
 import pureconfig._
 import pureconfig.generic.auto._
 
+import akka.stream.ActorMaterializer
+import java.net.InetSocketAddress
+import java.net.SocketException
+import org.apache.openwhisk.common.MetricEmitter
+import org.apache.openwhisk.common.TransactionId.systemPrefix
 import scala.collection.immutable
 import spray.json.DefaultJsonProtocol._
 import spray.json._
@@ -44,7 +57,6 @@
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.invoker.InvokerReactive.{ActiveAck, LogsCollector}
 import org.apache.openwhisk.http.Messages
-
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
@@ -165,17 +177,22 @@
                       invocationNamespace: EntityName,
                       action: ExecutableWhiskAction,
                       override val lastUsed: Instant,
-                      override val activeActivationCount: Int = 0)
+                      override val activeActivationCount: Int = 0,
+                      resumeRun: Option[Run] = None)
     extends ContainerStarted(container, lastUsed, action.limits.memory.megabytes.MB, activeActivationCount)
     with ContainerInUse {
   override val initingState = "warmed"
   override def nextRun(r: Run) = copy(lastUsed = Instant.now, activeActivationCount = activeActivationCount + 1)
+  //track the resuming run for easily referring to the action being resumed (it may fail and be resent)
+  def withoutResumeRun() = this.copy(resumeRun = None)
+  def withResumeRun(job: Run) = this.copy(resumeRun = Some(job))
 }
 
 // Events received by the actor
 case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
 case class Run(action: ExecutableWhiskAction, msg: ActivationMessage, retryLogDeadline: Option[Deadline] = None)
 case object Remove
+case class HealthPingEnabled(enabled: Boolean)
 
 // Events sent by the actor
 case class NeedWork(data: ContainerData)
@@ -234,12 +251,16 @@
                      collectLogs: LogsCollector,
                      instance: InvokerInstanceId,
                      poolConfig: ContainerPoolConfig,
+                     healtCheckConfig: ContainerProxyHealthCheckConfig,
                      unusedTimeout: FiniteDuration,
-                     pauseGrace: FiniteDuration)
+                     pauseGrace: FiniteDuration,
+                     testTcp: Option[ActorRef])
     extends FSM[ContainerState, ContainerData]
     with Stash {
   implicit val ec = context.system.dispatcher
   implicit val logging = new AkkaLogging(context.system.log)
+  implicit val ac = context.system
+  implicit val materializer = ActorMaterializer()
   var rescheduleJob = false // true iff actor receives a job but cannot process it because actor will destroy itself
   var runBuffer = immutable.Queue.empty[Run] //does not retain order, but does manage jobs that would have pushed past action concurrency limit
   //track buffer processing state to avoid extra transitions near end of buffer - this provides a pseudo-state between Running and Ready
@@ -247,6 +268,8 @@
 
   //keep a separate count to avoid confusion with ContainerState.activeActivationCount that is tracked/modified only in ContainerPool
   var activeCount = 0;
+  var healthPingActor: Option[ActorRef] = None //setup after prewarm starts
+  val tcp: ActorRef = testTcp.getOrElse(IO(Tcp)) //allows to testing interaction with Tcp extension
   startWith(Uninitialized, NoData())
 
   when(Uninitialized) {
@@ -346,7 +369,12 @@
         .pipeTo(self)
       goto(Running) using PreWarmedData(data.container, data.kind, data.memoryLimit, 1)
 
-    case Event(Remove, data: PreWarmedData) => destroyContainer(data.container)
+    case Event(Remove, data: PreWarmedData) => destroyContainer(data)
+
+    // prewarm container failed
+    case Event(_: FailureMessage, data: PreWarmedData) =>
+      MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_PREWARM)
+      destroyContainer(data)
   }
 
   when(Running) {
@@ -382,12 +410,12 @@
     // Run was successful
     case Event(RunCompleted, data: WarmedData) =>
       activeCount -= 1
-
+      val newData = data.withoutResumeRun()
       //if there are items in runbuffer, process them if there is capacity, and stay; otherwise if we have any pending activations, also stay
       if (requestWork(data) || activeCount > 0) {
-        stay using data
+        stay using newData
       } else {
-        goto(Ready) using data
+        goto(Ready) using newData
       }
     case Event(job: Run, data: WarmedData)
         if activeCount >= data.action.limits.concurrency.maxConcurrent && !rescheduleJob => //if we are over concurrency limit, and not a failure on resume
@@ -405,15 +433,31 @@
         .pipeTo(self)
       stay() using data
 
+    //ContainerHealthError should cause rescheduling of the job
+    case Event(FailureMessage(e: ContainerHealthError), data: WarmedData) =>
+      implicit val tid = e.tid
+      MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_WARM)
+      //resend to self will send to parent once we get to Removing state
+      val newData = data.resumeRun
+        .map { run =>
+          logging.warn(this, "Ready warm container unhealthy, will retry activation.")
+          self ! run
+          data.withoutResumeRun()
+        }
+        .getOrElse(data)
+      rescheduleJob = true
+      rejectBuffered()
+      destroyContainer(newData)
+
     // Failed after /init (the first run failed)
     case Event(_: FailureMessage, data: PreWarmedData) =>
       activeCount -= 1
-      destroyContainer(data.container)
+      destroyContainer(data)
 
     // Failed for a subsequent /run
     case Event(_: FailureMessage, data: WarmedData) =>
       activeCount -= 1
-      destroyContainer(data.container)
+      destroyContainer(data)
 
     // Failed at getting a container for a cold-start run
     case Event(_: FailureMessage, _) =>
@@ -429,24 +473,28 @@
     case Event(job: Run, data: WarmedData) =>
       implicit val transid = job.msg.transid
       activeCount += 1
-
-      initializeAndRun(data.container, job)
+      val newData = data.withResumeRun(job)
+      initializeAndRun(data.container, job, true)
         .map(_ => RunCompleted)
         .pipeTo(self)
 
-      goto(Running) using data
+      goto(Running) using newData
 
     // pause grace timed out
     case Event(StateTimeout, data: WarmedData) =>
       data.container.suspend()(TransactionId.invokerNanny).map(_ => ContainerPaused).pipeTo(self)
       goto(Pausing)
 
-    case Event(Remove, data: WarmedData) => destroyContainer(data.container)
+    case Event(Remove, data: WarmedData) => destroyContainer(data)
+
+    // warm container failed
+    case Event(_: FailureMessage, data: WarmedData) =>
+      destroyContainer(data)
   }
 
   when(Pausing) {
     case Event(ContainerPaused, data: WarmedData)   => goto(Paused)
-    case Event(_: FailureMessage, data: WarmedData) => destroyContainer(data.container)
+    case Event(_: FailureMessage, data: WarmedData) => destroyContainer(data)
     case _                                          => delay
   }
 
@@ -454,7 +502,7 @@
     case Event(job: Run, data: WarmedData) =>
       implicit val transid = job.msg.transid
       activeCount += 1
-
+      val newData = data.withResumeRun(job)
       data.container
         .resume()
         .andThen {
@@ -465,16 +513,15 @@
             rescheduleJob = true
             self ! job
         }
-        .flatMap(_ => initializeAndRun(data.container, job))
+        .flatMap(_ => initializeAndRun(data.container, job, true))
         .map(_ => RunCompleted)
         .pipeTo(self)
-
-      goto(Running) using data
+      goto(Running) using newData
 
     // container is reclaimed by the pool or it has become too old
     case Event(StateTimeout | Remove, data: WarmedData) =>
       rescheduleJob = true // to supress sending message to the pool and not double count
-      destroyContainer(data.container)
+      destroyContainer(data)
   }
 
   when(Removing) {
@@ -488,10 +535,25 @@
 
   // Unstash all messages stashed while in intermediate state
   onTransition {
-    case _ -> Started  => unstashAll()
-    case _ -> Ready    => unstashAll()
-    case _ -> Paused   => unstashAll()
-    case _ -> Removing => unstashAll()
+    case _ -> Started =>
+      if (healtCheckConfig.enabled) {
+        logging.debug(this, "enabling health ping on Started")
+        nextStateData.getContainer.foreach { c =>
+          enableHealthPing(c)
+        }
+      }
+      unstashAll()
+    case _ -> Running =>
+      if (healtCheckConfig.enabled && healthPingActor.isDefined) {
+        logging.debug(this, "disabling health ping on Running")
+        disableHealthPing()
+      }
+    case _ -> Ready =>
+      unstashAll()
+    case _ -> Paused =>
+      unstashAll()
+    case _ -> Removing =>
+      unstashAll()
   }
 
   initialize()
@@ -545,9 +607,10 @@
    * Destroys the container after unpausing it if needed. Can be used
    * as a state progression as it goes to Removing.
    *
-   * @param container the container to destroy
+   * @param newData the ContainerStarted which container will be destroyed
    */
-  def destroyContainer(container: Container) = {
+  def destroyContainer(newData: ContainerStarted) = {
+    val container = newData.container
     if (!rescheduleJob) {
       context.parent ! ContainerRemoved
     } else {
@@ -565,8 +628,8 @@
       .flatMap(_ => container.destroy()(TransactionId.invokerNanny))
       .map(_ => ContainerRemoved)
       .pipeTo(self)
-
-    goto(Removing)
+    println("removing")
+    goto(Removing) using newData
   }
 
   /**
@@ -581,6 +644,21 @@
     }
   }
 
+  private def enableHealthPing(c: Container) = {
+    val hpa = healthPingActor.getOrElse {
+      logging.info(this, s"creating health ping actor for ${c.addr.asString()}")
+      val hp = context.actorOf(
+        TCPPingClient
+          .props(tcp, c.toString(), healtCheckConfig, new InetSocketAddress(c.addr.host, c.addr.port)))
+      healthPingActor = Some(hp)
+      hp
+    }
+    hpa ! HealthPingEnabled(true)
+  }
+  private def disableHealthPing() = {
+    healthPingActor.foreach(_ ! HealthPingEnabled(false))
+  }
+
   /**
    * Runs the job, initialize first if necessary.
    * Completes the job by:
@@ -594,7 +672,8 @@
    * @return a future completing after logs have been collected and
    *         added to the WhiskActivation
    */
-  def initializeAndRun(container: Container, job: Run)(implicit tid: TransactionId): Future[WhiskActivation] = {
+  def initializeAndRun(container: Container, job: Run, reschedule: Boolean = false)(
+    implicit tid: TransactionId): Future[WhiskActivation] = {
     val actionTimeout = job.action.limits.timeout.duration
     val (env, parameters) = ContainerProxy.partitionArguments(job.msg.content, job.msg.initArgs)
 
@@ -643,8 +722,12 @@
           "deadline" -> (Instant.now.toEpochMilli + actionTimeout.toMillis).toString.toJson)
 
         container
-          .run(parameters, env.toJson.asJsObject, actionTimeout, job.action.limits.concurrency.maxConcurrent)(
-            job.msg.transid)
+          .run(
+            parameters,
+            env.toJson.asJsObject,
+            actionTimeout,
+            job.action.limits.concurrency.maxConcurrent,
+            reschedule)(job.msg.transid)
           .map {
             case (runInterval, response) =>
               val initRunInterval = initInterval
@@ -658,23 +741,23 @@
                 response)
           }
       }
-      .recover {
+      .recoverWith {
+        case h: ContainerHealthError =>
+          Future.failed(h)
         case InitializationError(interval, response) =>
-          ContainerProxy.constructWhiskActivation(
-            job,
-            Some(interval),
-            interval,
-            interval.duration >= actionTimeout,
-            response)
+          Future.successful(
+            ContainerProxy
+              .constructWhiskActivation(job, Some(interval), interval, interval.duration >= actionTimeout, response))
         case t =>
           // Actually, this should never happen - but we want to make sure to not miss a problem
           logging.error(this, s"caught unexpected error while running activation: ${t}")
-          ContainerProxy.constructWhiskActivation(
-            job,
-            None,
-            Interval.zero,
-            false,
-            ActivationResponse.whiskError(Messages.abnormalRun))
+          Future.successful(
+            ContainerProxy.constructWhiskActivation(
+              job,
+              None,
+              Interval.zero,
+              false,
+              ActivationResponse.whiskError(Messages.abnormalRun)))
       }
 
     val splitAckMessagesPendingLogCollection = collectLogs.logsToBeCollected(job.action)
@@ -755,6 +838,7 @@
 }
 
 final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, pauseGrace: FiniteDuration)
+final case class ContainerProxyHealthCheckConfig(enabled: Boolean, checkPeriod: FiniteDuration, maxFails: Int)
 
 object ContainerProxy {
   def props(factory: (TransactionId,
@@ -769,9 +853,23 @@
             collectLogs: LogsCollector,
             instance: InvokerInstanceId,
             poolConfig: ContainerPoolConfig,
+            healthCheckConfig: ContainerProxyHealthCheckConfig =
+              loadConfigOrThrow[ContainerProxyHealthCheckConfig](ConfigKeys.containerProxyHealth),
             unusedTimeout: FiniteDuration = timeouts.idleContainer,
-            pauseGrace: FiniteDuration = timeouts.pauseGrace) =
-    Props(new ContainerProxy(factory, ack, store, collectLogs, instance, poolConfig, unusedTimeout, pauseGrace))
+            pauseGrace: FiniteDuration = timeouts.pauseGrace,
+            tcp: Option[ActorRef] = None) =
+    Props(
+      new ContainerProxy(
+        factory,
+        ack,
+        store,
+        collectLogs,
+        instance,
+        poolConfig,
+        healthCheckConfig,
+        unusedTimeout,
+        pauseGrace,
+        tcp))
 
   // Needs to be thread-safe as it's used by multiple proxies concurrently.
   private val containerCount = new Counter
@@ -870,6 +968,74 @@
   }
 }
 
+object TCPPingClient {
+  def props(tcp: ActorRef, containerId: String, config: ContainerProxyHealthCheckConfig, remote: InetSocketAddress) =
+    Props(new TCPPingClient(tcp, containerId, remote, config))
+}
+
+class TCPPingClient(tcp: ActorRef,
+                    containerId: String,
+                    remote: InetSocketAddress,
+                    config: ContainerProxyHealthCheckConfig)
+    extends Actor {
+  implicit val logging = new AkkaLogging(context.system.log)
+  implicit val ec = context.system.dispatcher
+  implicit var healthPingTx = TransactionId.actionHealthPing
+  case object HealthPingSend
+
+  var scheduledPing: Option[Cancellable] = None
+  var failedCount = 0
+  val addressString = s"${remote.getHostString}:${remote.getPort}"
+  restartPing()
+
+  private def restartPing() = {
+    cancelPing() //just in case restart is called twice
+    scheduledPing = Some(
+      context.system.scheduler.schedule(config.checkPeriod, config.checkPeriod, self, HealthPingSend))
+  }
+  private def cancelPing() = {
+    scheduledPing.foreach(_.cancel())
+  }
+  def receive = {
+    case HealthPingEnabled(enabled) =>
+      if (enabled) {
+        restartPing()
+      } else {
+        cancelPing()
+      }
+    case HealthPingSend =>
+      healthPingTx = TransactionId(systemPrefix + "actionHealth") //reset the tx id each iteration
+      tcp ! Connect(remote)
+    case CommandFailed(_: Connect) =>
+      failedCount += 1
+      if (failedCount == config.maxFails) {
+        logging.error(
+          this,
+          s"Failed health connection to $containerId ($addressString) $failedCount times - exceeded max ${config.maxFails} failures")
+        //destroy this container since we cannot communicate with it
+        context.parent ! FailureMessage(
+          new SocketException(s"Health connection to $containerId ($addressString) failed $failedCount times"))
+        cancelPing()
+        context.stop(self)
+      } else {
+        logging.warn(this, s"Failed health connection to $containerId ($addressString) $failedCount times")
+      }
+
+    case Connected(_, _) =>
+      sender() ! Close
+      if (failedCount > 0) {
+        //reset in case of temp failure
+        logging.info(
+          this,
+          s"Succeeded health connection to $containerId ($addressString) after $failedCount previous failures")
+        failedCount = 0
+      } else {
+        logging.debug(this, s"Succeeded health connection to $containerId ($addressString)")
+      }
+
+  }
+}
+
 /** Indicates that something went wrong with an activation and the container should be removed */
 trait ActivationError extends Exception {
   val activation: WhiskActivation
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
index bca3674..a0163b8 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
@@ -164,7 +164,7 @@
  * @param addr the ip of the container
  */
 class DockerContainer(protected val id: ContainerId,
-                      protected val addr: ContainerAddress,
+                      protected[core] val addr: ContainerAddress,
                       protected val useRunc: Boolean)(implicit docker: DockerApiWithFileAccess,
                                                       runc: RuncApi,
                                                       override protected val as: ActorSystem,
@@ -210,11 +210,13 @@
     }
   }
 
-  override protected def callContainer(path: String,
-                                       body: JsObject,
-                                       timeout: FiniteDuration,
-                                       maxConcurrent: Int,
-                                       retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
+  override protected def callContainer(
+    path: String,
+    body: JsObject,
+    timeout: FiniteDuration,
+    maxConcurrent: Int,
+    retry: Boolean = false,
+    reschedule: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
     val started = Instant.now()
     val http = httpConnection.getOrElse {
       val conn = if (Container.config.akkaClient) {
@@ -231,7 +233,7 @@
     }
 
     http
-      .post(path, body, retry)
+      .post(path, body, retry, reschedule)
       .flatMap { response =>
         val finished = Instant.now()
 
diff --git a/core/monitoring/user-events/compose/grafana/dashboards/global-metrics.json b/core/monitoring/user-events/compose/grafana/dashboards/global-metrics.json
index d3c06d8..7d5e247 100644
--- a/core/monitoring/user-events/compose/grafana/dashboards/global-metrics.json
+++ b/core/monitoring/user-events/compose/grafana/dashboards/global-metrics.json
@@ -15,6 +15,7 @@
   "editable": true,
   "gnetId": null,
   "graphTooltip": 0,
+  "iteration": 1580164337194,
   "links": [],
   "panels": [
     {
@@ -282,7 +283,9 @@
       "recent": false,
       "search": true,
       "starred": false,
-      "tags": ["openwhisk"],
+      "tags": [
+        "openwhisk"
+      ],
       "title": "Related Dashboards",
       "type": "dashlist"
     },
@@ -557,6 +560,177 @@
         "align": false,
         "alignLevel": null
       }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "fill": 1,
+      "gridPos": {
+        "h": 8,
+        "w": 12,
+        "x": 0,
+        "y": 19
+      },
+      "id": 13,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 2,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "sum(increase(counter_invoker_containerStart_counter_total{containerState!=\"warmed\"}[$interval])) by (containerState)",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 1,
+          "legendFormat": "{{containerState}}",
+          "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeRegions": [],
+      "timeShift": null,
+      "title": "Non-warm container starts [$interval]",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "fill": 1,
+      "gridPos": {
+        "h": 8,
+        "w": 12,
+        "x": 12,
+        "y": 19
+      },
+      "id": 14,
+      "interval": "",
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 2,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "sum(increase(counter_invoker_containerStart_counter_total{containerState=\"warmed\"}[$interval])) by (containerState)",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 1,
+          "legendFormat": "{{ containerState }}",
+          "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeRegions": [],
+      "timeShift": null,
+      "title": "Warm container starts [$interval]",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
     }
   ],
   "refresh": false,
diff --git a/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json b/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json
index 389d2e5..9962218 100644
--- a/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json
+++ b/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json
@@ -1,46 +1,4 @@
 {
-  "__inputs": [
-    {
-      "name": "DS_PROMETHEUS",
-      "label": "Prometheus",
-      "description": "",
-      "type": "datasource",
-      "pluginId": "prometheus",
-      "pluginName": "Prometheus"
-    }
-  ],
-  "__requires": [
-    {
-      "type": "grafana",
-      "id": "grafana",
-      "name": "Grafana",
-      "version": "6.1.6"
-    },
-    {
-      "type": "panel",
-      "id": "graph",
-      "name": "Graph",
-      "version": ""
-    },
-    {
-      "type": "datasource",
-      "id": "prometheus",
-      "name": "Prometheus",
-      "version": "1.0.0"
-    },
-    {
-      "type": "panel",
-      "id": "singlestat",
-      "name": "Singlestat",
-      "version": ""
-    },
-    {
-      "type": "panel",
-      "id": "table",
-      "name": "Table",
-      "version": ""
-    }
-  ],
   "annotations": {
     "list": [
       {
@@ -58,8 +16,7 @@
   "editable": true,
   "gnetId": 9564,
   "graphTooltip": 0,
-  "id": null,
-  "iteration": 1574127067913,
+  "iteration": 1580164808936,
   "links": [],
   "panels": [
     {
@@ -1476,6 +1433,174 @@
         "align": false,
         "alignLevel": null
       }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "fill": 1,
+      "gridPos": {
+        "h": 8,
+        "w": 12,
+        "x": 0,
+        "y": 47
+      },
+      "id": 41,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 2,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "sum(increase(counter_invoker_containerStart_counter_total{containerState!=\"warmed\",namespace=~\"$namespace\",action=~\"$action\",initiator=~\"$initiator\",region=~\"$region\",stack=~\"$stack\"}[$interval])) by (containerState)",
+          "format": "time_series",
+          "intervalFactor": 1,
+          "legendFormat": "{{containerState}}",
+          "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeRegions": [],
+      "timeShift": null,
+      "title": "Non-warm container starts [$interval]",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "fill": 1,
+      "gridPos": {
+        "h": 8,
+        "w": 12,
+        "x": 12,
+        "y": 47
+      },
+      "id": 42,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 2,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "sum(increase(counter_invoker_containerStart_counter_total{containerState=\"warmed\",namespace=~\"$namespace\",action=~\"$action\",initiator=~\"$initiator\",region=~\"$region\",stack=~\"$stack\"}[$interval])) by (containerState)",
+          "format": "time_series",
+          "intervalFactor": 1,
+          "legendFormat": "{{containerState}}",
+          "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeRegions": [],
+      "timeShift": null,
+      "title": "Warm container starts [$interval]",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
     }
   ],
   "refresh": "5s",
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
index 3c6b14e..c2da266 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
@@ -39,6 +39,7 @@
 import spray.json.JsObject
 import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.core.containerpool.AkkaContainerClient
+import org.apache.openwhisk.core.containerpool.ContainerHealthError
 import org.apache.openwhisk.core.entity.ActivationResponse._
 import org.apache.openwhisk.core.entity.size._
 
@@ -143,6 +144,14 @@
     waited should be < (timeout * 2).toMillis
   }
 
+  it should "throw ContainerHealthError on HttpHostConnectException if reschedule==true" in {
+    val timeout = 5.seconds
+    val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 100)
+    assertThrows[ContainerHealthError] {
+      Await.result(connection.post("/run", JsObject.empty, retry = false, reschedule = true), 10.seconds)
+    }
+  }
+
   it should "retry till success within timeout limit" in {
     val timeout = 5.seconds
     val retryInterval = 500.milliseconds
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
index 4675953..eee65a3 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
@@ -39,6 +39,7 @@
 import scala.concurrent.Await
 import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.core.containerpool.ApacheBlockingContainerClient
+import org.apache.openwhisk.core.containerpool.ContainerHealthError
 import org.apache.openwhisk.core.containerpool.RetryableConnectionError
 import org.apache.openwhisk.core.entity.ActivationResponse.Timeout
 import org.apache.openwhisk.core.entity.size._
@@ -138,6 +139,15 @@
     waited should be < (timeout * 2).toMillis
   }
 
+  it should "throw ContainerHealthError on HttpHostConnectException if reschedule==true" in {
+    val timeout = 5.seconds
+    val badHostAndPort = "0.0.0.0:12345"
+    val connection = new ApacheBlockingContainerClient(badHostAndPort, timeout, 1.B)
+    assertThrows[ContainerHealthError] {
+      Await.result(connection.post("/run", JsObject.empty, retry = false, reschedule = true), 10.seconds)
+    }
+  }
+
   it should "not truncate responses within limit" in {
     val timeout = 1.minute.toMillis
     val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis, 50.B)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
index 624c544..2c25eac 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -112,7 +112,8 @@
         body: JsObject,
         timeout: FiniteDuration,
         concurrent: Int,
-        retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
+        retry: Boolean = false,
+        reschedule: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
         ccRes
       }
       override protected val logCollectingIdleTimeout = awaitLogs
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index db806b9..c580b5d 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -108,7 +108,8 @@
         body: JsObject,
         timeout: FiniteDuration,
         concurrent: Int,
-        retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
+        retry: Boolean = false,
+        reschedule: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
         ccRes
       }
       override protected val waitForLogs = awaitLogs
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
index 5a7b22c..4efbe85 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
@@ -110,11 +110,11 @@
 
   /** Helper to create PreWarmedData */
   def preWarmedData(kind: String, memoryLimit: ByteSize = memoryLimit) =
-    PreWarmedData(stub[Container], kind, memoryLimit)
+    PreWarmedData(stub[MockableContainer], kind, memoryLimit)
 
   /** Helper to create WarmedData */
   def warmedData(run: Run, lastUsed: Instant = Instant.now) = {
-    WarmedData(stub[Container], run.msg.user.namespace.name, run.action, lastUsed)
+    WarmedData(stub[MockableContainer], run.msg.user.namespace.name, run.action, lastUsed)
   }
 
   /** Creates a sequence of containers and a factory returning this sequence. */
@@ -741,6 +741,29 @@
     pool ! runMessageConcurrent
     containers(0).expectMsg(runMessageConcurrent)
   }
+
+  it should "backfill prewarms when prewarm containers are removed" in {
+    val (containers, factory) = testContainers(6)
+    val feed = TestProbe()
+
+    val pool =
+      system.actorOf(ContainerPool
+        .props(factory, poolConfig(MemoryLimit.STD_MEMORY * 5), feed.ref, List(PrewarmingConfig(2, exec, memoryLimit))))
+    containers(0).expectMsg(Start(exec, memoryLimit))
+    containers(1).expectMsg(Start(exec, memoryLimit))
+
+    //removing 2 prewarm containers will start 2 containers via backfill
+    containers(0).send(pool, ContainerRemoved)
+    containers(1).send(pool, ContainerRemoved)
+    containers(2).expectMsg(Start(exec, memoryLimit))
+    containers(3).expectMsg(Start(exec, memoryLimit))
+    //make sure extra prewarms are not started
+    containers(4).expectNoMessage(100.milliseconds)
+    containers(5).expectNoMessage(100.milliseconds)
+  }
+}
+abstract class MockableContainer extends Container {
+  protected[core] val addr: ContainerAddress = ContainerAddress("nohost")
 }
 
 /**
@@ -765,14 +788,14 @@
                  namespace: String = standardNamespace.asString,
                  lastUsed: Instant = Instant.now,
                  active: Int = 0) =
-    WarmedData(stub[Container], EntityName(namespace), action, lastUsed, active)
+    WarmedData(stub[MockableContainer], EntityName(namespace), action, lastUsed, active)
 
   /** Helper to create WarmingData with sensible defaults */
   def warmingData(action: ExecutableWhiskAction = createAction(),
                   namespace: String = standardNamespace.asString,
                   lastUsed: Instant = Instant.now,
                   active: Int = 0) =
-    WarmingData(stub[Container], EntityName(namespace), action, lastUsed, active)
+    WarmingData(stub[MockableContainer], EntityName(namespace), action, lastUsed, active)
 
   /** Helper to create WarmingData with sensible defaults */
   def warmingColdData(action: ExecutableWhiskAction = createAction(),
@@ -782,7 +805,7 @@
     WarmingColdData(EntityName(namespace), action, lastUsed, active)
 
   /** Helper to create PreWarmedData with sensible defaults */
-  def preWarmedData(kind: String = "anyKind") = PreWarmedData(stub[Container], kind, 256.MB)
+  def preWarmedData(kind: String = "anyKind") = PreWarmedData(stub[MockableContainer], kind, 256.MB)
 
   /** Helper to create NoData */
   def noData() = NoData()
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 464be2d..5666555 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -17,18 +17,19 @@
 
 package org.apache.openwhisk.core.containerpool.test
 
+import java.net.InetSocketAddress
 import java.time.Instant
 
 import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
 import akka.actor.{ActorRef, ActorSystem, FSM}
 import akka.stream.scaladsl.Source
-import akka.testkit.CallingThreadDispatcher
-import akka.testkit.{ImplicitSender, TestKit}
+import akka.testkit.{CallingThreadDispatcher, ImplicitSender, TestKit, TestProbe}
 import akka.util.ByteString
 import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction, WhiskProperties}
 import java.time.temporal.ChronoUnit
 import java.util.concurrent.atomic.AtomicInteger
 
+import akka.io.Tcp.{Close, CommandFailed, Connect, Connected}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
@@ -161,7 +162,7 @@
   def expectWarmed(namespace: String, action: ExecutableWhiskAction) = {
     val test = EntityName(namespace)
     expectMsgPF() {
-      case a @ NeedWork(WarmedData(_, `test`, `action`, _, _)) => //matched, otherwise will fail
+      case a @ NeedWork(WarmedData(_, `test`, `action`, _, _, _)) => //matched, otherwise will fail
     }
   }
 
@@ -273,6 +274,7 @@
       Future.successful(())
   }
   val poolConfig = ContainerPoolConfig(2.MB, 0.5, false)
+  def healthchecksConfig(enabled: Boolean = false) = ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2)
   val filterEnvVar = (k: String) => Character.isUpperCase(k.charAt(0))
 
   behavior of "ContainerProxy"
@@ -309,6 +311,7 @@
             createCollector(),
             InvokerInstanceId(0, Some("myname"), userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
     preWarm(machine)
@@ -338,6 +341,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
 
@@ -382,6 +386,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
     preWarm(machine)
@@ -445,6 +450,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
     preWarm(machine)
@@ -495,6 +501,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
     run(machine, Uninitialized)
@@ -535,6 +542,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
 
@@ -554,6 +562,110 @@
     }
   }
 
+  it should "resend a failed Run when it is first Run after Ready state" in within(timeout) {
+    val noLogsAction = action.copy(limits = ActionLimits(logs = LogLimit(0.MB)))
+    val container = new TestContainer {
+      override def run(
+        parameters: JsObject,
+        environment: JsObject,
+        timeout: FiniteDuration,
+        concurrent: Int,
+        reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+        atomicRunCount.incrementAndGet()
+        //every run after first fails
+        if (runCount > 1) {
+          Future.failed(ContainerHealthError(messageTransId, "intentional failure"))
+        } else {
+          Future.successful((runInterval, ActivationResponse.success()))
+        }
+      }
+    }
+    val factory = createFactory(Future.successful(container))
+    val acker = createAcker(noLogsAction)
+    val store = createStore
+    val collector = createCollector()
+
+    val machine =
+      childActorOf(
+        ContainerProxy
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            healthchecksConfig(),
+            pauseGrace = pauseGrace))
+    registerCallback(machine)
+
+    machine ! Run(noLogsAction, message)
+    expectMsg(Transition(machine, Uninitialized, Running))
+    expectWarmed(invocationNamespace.name, noLogsAction)
+    expectMsg(Transition(machine, Running, Ready))
+
+    val failingRun = Run(noLogsAction, message)
+    val runAfterFail = Run(noLogsAction, message)
+    //should fail and retry
+    machine ! failingRun
+    machine ! runAfterFail //will be buffered first, and then retried
+    expectMsg(Transition(machine, Ready, Running))
+    //on failure, buffered are resent first
+    expectMsg(runAfterFail)
+    //resend the first run to parent, and start removal process
+    expectMsg(RescheduleJob)
+    expectMsg(Transition(machine, Running, Removing))
+    expectMsg(failingRun)
+    expectNoMessage(100.milliseconds)
+
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 1
+      container.runCount shouldBe 2
+      collector.calls should have size 0
+      acker.calls should have size 1
+      store.calls should have size 1
+      acker.calls.head._6 shouldBe a[CompletionMessage]
+    }
+  }
+
+  it should "start tcp ping to containers when action healthcheck enabled" in within(timeout) {
+    val noLogsAction = action.copy(limits = ActionLimits(logs = LogLimit(0.MB)))
+    val container = new TestContainer()
+    val factory = createFactory(Future.successful(container))
+    val acker = createAcker(noLogsAction)
+    val store = createStore
+    val collector = createCollector()
+    val tcpProbe = TestProbe()
+    val healthchecks = healthchecksConfig(true)
+    val machine =
+      childActorOf(
+        ContainerProxy
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            healthchecks,
+            pauseGrace = pauseGrace,
+            tcp = Some(tcpProbe.ref)))
+    registerCallback(machine)
+    preWarm(machine)
+
+    tcpProbe.expectMsg(Connect(new InetSocketAddress("0.0.0.0", 8080)))
+    tcpProbe.expectMsg(Connect(new InetSocketAddress("0.0.0.0", 8080)))
+    tcpProbe.expectMsg(Connect(new InetSocketAddress("0.0.0.0", 8080)))
+    //pings should repeat till the container goes into Running state
+    run(machine, Started)
+    tcpProbe.expectNoMessage(healthchecks.checkPeriod + 100.milliseconds)
+
+    awaitAssert {
+      factory.calls should have size 1
+    }
+  }
+
   it should "respond with CombinedCompletionAndResultMessage for blocking invocation with no logs" in within(timeout) {
     val noLogsAction = action.copy(limits = ActionLimits(logs = LogLimit(0.MB)))
     val blockingMessage = message.copy(blocking = true)
@@ -663,6 +775,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
     (factory, container, acker, store, collector, machine)
@@ -706,6 +819,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace)
           .withDispatcher(CallingThreadDispatcher.Id))
     registerCallback(machine)
@@ -801,8 +915,12 @@
   it should "complete the transaction and reuse the container on a failed run IFF failure was applicationError" in within(
     timeout) {
     val container = new TestContainer {
-      override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration, concurrent: Int)(
-        implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+      override def run(
+        parameters: JsObject,
+        environment: JsObject,
+        timeout: FiniteDuration,
+        concurrent: Int,
+        reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
         atomicRunCount.incrementAndGet()
         //every other run fails
         if (runCount % 2 == 0) {
@@ -827,6 +945,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = timeout))
     registerCallback(machine)
     preWarm(machine)
@@ -900,6 +1019,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
     machine ! Run(action, message)
@@ -944,6 +1064,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
     machine ! Run(action, message)
@@ -971,8 +1092,12 @@
   it should "complete the transaction and destroy the container on a failed run IFF failure was containerError" in within(
     timeout) {
     val container = new TestContainer {
-      override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration, concurrent: Int)(
-        implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+      override def run(
+        parameters: JsObject,
+        environment: JsObject,
+        timeout: FiniteDuration,
+        concurrent: Int,
+        reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
         atomicRunCount.incrementAndGet()
         Future.successful((initInterval, ActivationResponse.developerError(("boom"))))
       }
@@ -992,6 +1117,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
     machine ! Run(action, message)
@@ -1030,6 +1156,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
     machine ! Run(action, message)
@@ -1067,6 +1194,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
     machine ! Run(action, message)
@@ -1108,6 +1236,7 @@
             createCollector(),
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
     run(machine, Uninitialized) // first run an activation
@@ -1130,6 +1259,145 @@
     }
   }
 
+  it should "resend the job to the parent if /run fails connection after Paused -> Running" in within(timeout) {
+    val container = new TestContainer {
+      override def run(
+        parameters: JsObject,
+        environment: JsObject,
+        timeout: FiniteDuration,
+        concurrent: Int,
+        reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+
+        if (reschedule) {
+          throw ContainerHealthError(transid, "reconnect failed to xyz")
+        }
+        super.run(parameters, environment, timeout, concurrent, reschedule)
+      }
+    }
+    val factory = createFactory(Future.successful(container))
+    val acker = createAcker()
+    val store = createStore
+
+    val machine =
+      childActorOf(
+        ContainerProxy
+          .props(
+            factory,
+            acker,
+            store,
+            createCollector(),
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            healthchecksConfig(),
+            pauseGrace = pauseGrace))
+    registerCallback(machine)
+    run(machine, Uninitialized) // first run an activation
+    timeout(machine) // times out Ready state so container suspends
+    expectPause(machine)
+
+    val runMessage = Run(action, message)
+    machine ! runMessage
+    expectMsg(Transition(machine, Paused, Running))
+    expectMsg(RescheduleJob)
+    expectMsg(Transition(machine, Running, Removing))
+    expectMsg(runMessage)
+
+    awaitAssert {
+      factory.calls should have size 1
+      container.runCount shouldBe 1
+      container.suspendCount shouldBe 1
+      container.resumeCount shouldBe 1
+      container.destroyCount shouldBe 1
+    }
+  }
+
+  it should "resend the job to the parent if /run fails connection after Ready -> Running" in within(timeout) {
+    val container = new TestContainer {
+      override def run(
+        parameters: JsObject,
+        environment: JsObject,
+        timeout: FiniteDuration,
+        concurrent: Int,
+        reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+
+        if (reschedule) {
+          throw ContainerHealthError(transid, "reconnect failed to xyz")
+        }
+        super.run(parameters, environment, timeout, concurrent, reschedule)
+      }
+    }
+    val factory = createFactory(Future.successful(container))
+    val acker = createAcker()
+    val store = createStore
+
+    val machine =
+      childActorOf(
+        ContainerProxy
+          .props(
+            factory,
+            acker,
+            store,
+            createCollector(),
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            healthchecksConfig(),
+            pauseGrace = pauseGrace))
+    registerCallback(machine)
+    run(machine, Uninitialized) // first run an activation
+    //will be in Ready state now
+
+    val runMessage = Run(action, message)
+    machine ! runMessage
+    expectMsg(Transition(machine, Ready, Running))
+    expectMsg(RescheduleJob)
+    expectMsg(Transition(machine, Running, Removing))
+    expectMsg(runMessage)
+
+    awaitAssert {
+      factory.calls should have size 1
+      container.runCount shouldBe 1
+      container.suspendCount shouldBe 0
+      container.resumeCount shouldBe 0
+      container.destroyCount shouldBe 1
+    }
+  }
+
+  it should "remove and replace a prewarm container if it fails healthcheck after startup" in within(timeout) {
+    val container = new TestContainer
+    val factory = createFactory(Future.successful(container))
+    val acker = createAcker()
+    val store = createStore
+    val collector = createCollector()
+
+    val machine =
+      childActorOf(
+        ContainerProxy
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            healthchecksConfig(true),
+            pauseGrace = pauseGrace))
+    registerCallback(machine)
+    preWarm(machine)
+
+    //expect failure after healthchecks fail
+    expectMsg(ContainerRemoved)
+    expectMsg(Transition(machine, Started, Removing))
+
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 0
+      container.runCount shouldBe 0
+      collector.calls should have size 0
+      container.suspendCount shouldBe 0
+      container.resumeCount shouldBe 0
+      acker.calls should have size 0
+    }
+  }
   it should "remove the container if suspend fails" in within(timeout) {
     val container = new TestContainer {
       override def suspend()(implicit transid: TransactionId) = {
@@ -1151,6 +1419,7 @@
             createCollector(),
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
     run(machine, Uninitialized)
@@ -1196,6 +1465,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
 
@@ -1255,6 +1525,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
     run(machine, Uninitialized)
@@ -1303,6 +1574,7 @@
             collector,
             InvokerInstanceId(0, userMemory = defaultUserMemory),
             poolConfig,
+            healthchecksConfig(),
             pauseGrace = pauseGrace))
     registerCallback(machine)
 
@@ -1377,7 +1649,7 @@
       initialCount)
     val nextWarmedData = warmedData.nextRun(Run(action, message))
     nextWarmedData should matchPattern {
-      case WarmedData(pwData.container, message.user.namespace.name, action, _, newCount) =>
+      case WarmedData(pwData.container, message.user.namespace.name, action, _, newCount, _) =>
     }
     warmedData.lastUsed.until(nextWarmedData.lastUsed, ChronoUnit.SECONDS) should be >= timeDiffSeconds.toLong
   }
@@ -1390,7 +1662,7 @@
                       apiKeyMustBePresent: Boolean = true)
       extends Container {
     protected[core] val id = ContainerId("testcontainer")
-    protected val addr = ContainerAddress("0.0.0.0")
+    protected[core] val addr = ContainerAddress("0.0.0.0")
     protected implicit val logging: Logging = log
     protected implicit val ec: ExecutionContext = system.dispatcher
     override implicit protected val as: ActorSystem = system
@@ -1458,8 +1730,12 @@
 
       initPromise.map(_.future).getOrElse(Future.successful(initInterval))
     }
-    override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration, concurrent: Int)(
-      implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+    override def run(
+      parameters: JsObject,
+      environment: JsObject,
+      timeout: FiniteDuration,
+      concurrent: Int,
+      reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
 
       // the "init" arguments are not passed on run
       parameters shouldBe JsObject(activationArguments.fields.filter(k => !filterEnvVar(k._1)))
@@ -1496,3 +1772,66 @@
     }
   }
 }
+@RunWith(classOf[JUnitRunner])
+class TCPPingClientTests extends TestKit(ActorSystem("TCPPingClient")) with Matchers with FlatSpecLike {
+  val config = ContainerProxyHealthCheckConfig(true, 200.milliseconds, 2)
+  val addr = new InetSocketAddress("1.2.3.4", 12345)
+  val localAddr = new InetSocketAddress("localhost", 5432)
+
+  behavior of "TCPPingClient"
+  it should "start the ping on HealthPingEnabled(true) and stop on HealthPingEnabled(false)" in {
+    val tcpProbe = TestProbe()
+    val pingClient = system.actorOf(TCPPingClient.props(tcpProbe.ref, "1234", config, addr))
+    pingClient ! HealthPingEnabled(true)
+    tcpProbe.expectMsg(Connect(addr))
+    //measure the delay between connections
+    val start = System.currentTimeMillis()
+    tcpProbe.expectMsg(Connect(addr))
+    val delay = System.currentTimeMillis() - start
+    delay should be > config.checkPeriod.toMillis - 25 //allow 25ms slop
+    tcpProbe.expectMsg(Connect(addr))
+    //make sure disable works
+    pingClient ! HealthPingEnabled(false)
+    //make sure no Connect msg for at least the check period
+    tcpProbe.expectNoMessage(config.checkPeriod)
+  }
+  it should "send FailureMessage and cancel the ping on CommandFailed" in {
+    val tcpProbe = TestProbe()
+    val pingClient = system.actorOf(TCPPingClient.props(tcpProbe.ref, "1234", config, addr))
+    val clientProbe = TestProbe()
+    clientProbe watch pingClient
+    pingClient ! HealthPingEnabled(true)
+    val c = Connect(addr)
+    //send config.maxFails CommandFailed messages
+    (1 to config.maxFails).foreach { _ =>
+      tcpProbe.expectMsg(c)
+      pingClient ! CommandFailed(c)
+    }
+    //now we expect termination
+    clientProbe.expectTerminated(pingClient)
+  }
+  it should "reset failedCount on Connected" in {
+    val tcpProbe = TestProbe()
+    val pingClient = system.actorOf(TCPPingClient.props(tcpProbe.ref, "1234", config, addr))
+    val clientProbe = TestProbe()
+    clientProbe watch pingClient
+    pingClient ! HealthPingEnabled(true)
+    val c = Connect(addr)
+    //send maxFails-1 (should not fail)
+    (1 to config.maxFails - 1).foreach { _ =>
+      tcpProbe.expectMsg(c)
+      pingClient ! CommandFailed(c)
+    }
+    tcpProbe.expectMsg(c)
+    tcpProbe.send(pingClient, Connected(addr, localAddr))
+    //counter should be reset
+    tcpProbe.expectMsg(Close)
+    //send maxFails (will fail, but counter is reset so we get maxFails tries)
+    (1 to config.maxFails).foreach { _ =>
+      tcpProbe.expectMsg(c)
+      pingClient ! CommandFailed(c)
+    }
+    //now we expect termination
+    clientProbe.expectTerminated(pingClient)
+  }
+}