emit additional metrics about invoker ContainerPool (#4625)

* emit additional metrics about invoker ContainerPool
* added counter increment for rescheduled run message
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index c728c5a..42e9b12 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -350,6 +350,7 @@
   private val kafka = "kafka"
   private val loadbalancer = "loadbalancer"
   private val containerClient = "containerClient"
+  private val containerPool = "containerPool"
 
   /*
    * The following markers are used to emit log messages as well as metrics. Add all LogMarkerTokens below to
@@ -489,6 +490,21 @@
   val CONTAINER_CLIENT_RETRIES =
     LogMarkerToken(containerClient, "retries", counter)(MeasurementUnit.none)
 
+  val CONTAINER_POOL_RESCHEDULED_ACTIVATION =
+    LogMarkerToken(containerPool, "rescheduledActivation", counter)(MeasurementUnit.none)
+  val CONTAINER_POOL_RUNBUFFER_COUNT =
+    LogMarkerToken(containerPool, "runBufferCount", counter)(MeasurementUnit.none)
+  val CONTAINER_POOL_RUNBUFFER_SIZE =
+    LogMarkerToken(containerPool, "runBufferSize", counter)(MeasurementUnit.information.megabytes)
+  val CONTAINER_POOL_ACTIVE_COUNT =
+    LogMarkerToken(containerPool, "activeCount", counter)(MeasurementUnit.none)
+  val CONTAINER_POOL_ACTIVE_SIZE =
+    LogMarkerToken(containerPool, "activeSize", counter)(MeasurementUnit.information.megabytes)
+  val CONTAINER_POOL_PREWARM_COUNT =
+    LogMarkerToken(containerPool, "prewarmCount", counter)(MeasurementUnit.none)
+  val CONTAINER_POOL_PREWARM_SIZE =
+    LogMarkerToken(containerPool, "prewarmSize", counter)(MeasurementUnit.information.megabytes)
+
   val INVOKER_TOTALMEM_BLACKBOX = LogMarkerToken(loadbalancer, "totalCapacityBlackBox", counter)(MeasurementUnit.none)
   val INVOKER_TOTALMEM_MANAGED = LogMarkerToken(loadbalancer, "totalCapacityManaged", counter)(MeasurementUnit.none)
 
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index e219ff6..1fecfbf 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -18,11 +18,11 @@
 package org.apache.openwhisk.core.containerpool
 
 import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
+import org.apache.openwhisk.common.MetricEmitter
 import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
 import org.apache.openwhisk.core.connector.MessageFeed
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
-
 import scala.annotation.tailrec
 import scala.collection.immutable
 import scala.concurrent.duration._
@@ -34,6 +34,8 @@
 
 case class WorkerData(data: ContainerData, state: WorkerState)
 
+case object EmitMetrics
+
 /**
  * A pool managing containers to run actions on.
  *
@@ -62,6 +64,7 @@
   import ContainerPool.memoryConsumptionOf
 
   implicit val logging = new AkkaLogging(context.system.log)
+  implicit val ec = context.dispatcher
 
   var freePool = immutable.Map.empty[ActorRef, ContainerData]
   var busyPool = immutable.Map.empty[ActorRef, ContainerData]
@@ -71,6 +74,8 @@
   // Otherwise actions with small memory-limits could block actions with large memory limits.
   var runBuffer = immutable.Queue.empty[Run]
   val logMessageInterval = 10.seconds
+  //periodically emit metrics (don't need to do this for each message!)
+  context.system.scheduler.schedule(30.seconds, 10.seconds, self, EmitMetrics)
 
   prewarmConfig.foreach { config =>
     logging.info(this, s"pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")(
@@ -187,6 +192,7 @@
                   s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}, " +
                   s"needed memory: ${r.action.limits.memory.megabytes} MB, " +
                   s"waiting messages: ${runBuffer.size}")(r.msg.transid)
+              MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_RESCHEDULED_ACTIVATION)
               Some(logMessageInterval.fromNow)
             } else {
               r.retryLogDeadline
@@ -257,6 +263,8 @@
     case RescheduleJob =>
       freePool = freePool - sender()
       busyPool = busyPool - sender()
+    case EmitMetrics =>
+      emitMetrics()
   }
 
   /** Creates a new container and updates state accordingly. */
@@ -316,6 +324,25 @@
   def hasPoolSpaceFor[A](pool: Map[A, ContainerData], memory: ByteSize): Boolean = {
     memoryConsumptionOf(pool) + memory.toMB <= poolConfig.userMemory.toMB
   }
+
+  /**
+   * Log metrics about pool state (buffer size, buffer memory requirements, active number, active memory, prewarm number, prewarm memory)
+   */
+  private def emitMetrics() = {
+    MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_RUNBUFFER_COUNT, runBuffer.size)
+    MetricEmitter.emitGaugeMetric(
+      LoggingMarkers.CONTAINER_POOL_RUNBUFFER_SIZE,
+      runBuffer.map(_.action.limits.memory.megabytes).sum)
+    val containersInUse = freePool.filter(_._2.activeActivationCount > 0) ++ busyPool
+    MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_ACTIVE_COUNT, containersInUse.size)
+    MetricEmitter.emitGaugeMetric(
+      LoggingMarkers.CONTAINER_POOL_ACTIVE_SIZE,
+      containersInUse.map(_._2.memoryLimit.toMB).sum)
+    MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_COUNT, prewarmedPool.size)
+    MetricEmitter.emitGaugeMetric(
+      LoggingMarkers.CONTAINER_POOL_PREWARM_SIZE,
+      prewarmedPool.map(_._2.memoryLimit.toMB).sum)
+  }
 }
 
 object ContainerPool {