emit additional metrics about invoker ContainerPool (#4625)
* emit additional metrics about invoker ContainerPool
* added counter increment for rescheduled run message
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index c728c5a..42e9b12 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -350,6 +350,7 @@
private val kafka = "kafka"
private val loadbalancer = "loadbalancer"
private val containerClient = "containerClient"
+ private val containerPool = "containerPool"
/*
* The following markers are used to emit log messages as well as metrics. Add all LogMarkerTokens below to
@@ -489,6 +490,21 @@
val CONTAINER_CLIENT_RETRIES =
LogMarkerToken(containerClient, "retries", counter)(MeasurementUnit.none)
+ val CONTAINER_POOL_RESCHEDULED_ACTIVATION =
+ LogMarkerToken(containerPool, "rescheduledActivation", counter)(MeasurementUnit.none)
+ val CONTAINER_POOL_RUNBUFFER_COUNT =
+ LogMarkerToken(containerPool, "runBufferCount", counter)(MeasurementUnit.none)
+ val CONTAINER_POOL_RUNBUFFER_SIZE =
+ LogMarkerToken(containerPool, "runBufferSize", counter)(MeasurementUnit.information.megabytes)
+ val CONTAINER_POOL_ACTIVE_COUNT =
+ LogMarkerToken(containerPool, "activeCount", counter)(MeasurementUnit.none)
+ val CONTAINER_POOL_ACTIVE_SIZE =
+ LogMarkerToken(containerPool, "activeSize", counter)(MeasurementUnit.information.megabytes)
+ val CONTAINER_POOL_PREWARM_COUNT =
+ LogMarkerToken(containerPool, "prewarmCount", counter)(MeasurementUnit.none)
+ val CONTAINER_POOL_PREWARM_SIZE =
+ LogMarkerToken(containerPool, "prewarmSize", counter)(MeasurementUnit.information.megabytes)
+
val INVOKER_TOTALMEM_BLACKBOX = LogMarkerToken(loadbalancer, "totalCapacityBlackBox", counter)(MeasurementUnit.none)
val INVOKER_TOTALMEM_MANAGED = LogMarkerToken(loadbalancer, "totalCapacityManaged", counter)(MeasurementUnit.none)
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index e219ff6..1fecfbf 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -18,11 +18,11 @@
package org.apache.openwhisk.core.containerpool
import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
+import org.apache.openwhisk.common.MetricEmitter
import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
-
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration._
@@ -34,6 +34,8 @@
case class WorkerData(data: ContainerData, state: WorkerState)
+case object EmitMetrics
+
/**
* A pool managing containers to run actions on.
*
@@ -62,6 +64,7 @@
import ContainerPool.memoryConsumptionOf
implicit val logging = new AkkaLogging(context.system.log)
+ implicit val ec = context.dispatcher
var freePool = immutable.Map.empty[ActorRef, ContainerData]
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
@@ -71,6 +74,8 @@
// Otherwise actions with small memory-limits could block actions with large memory limits.
var runBuffer = immutable.Queue.empty[Run]
val logMessageInterval = 10.seconds
+ //periodically emit metrics (don't need to do this for each message!)
+ context.system.scheduler.schedule(30.seconds, 10.seconds, self, EmitMetrics)
prewarmConfig.foreach { config =>
logging.info(this, s"pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")(
@@ -187,6 +192,7 @@
s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}, " +
s"needed memory: ${r.action.limits.memory.megabytes} MB, " +
s"waiting messages: ${runBuffer.size}")(r.msg.transid)
+ MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_RESCHEDULED_ACTIVATION)
Some(logMessageInterval.fromNow)
} else {
r.retryLogDeadline
@@ -257,6 +263,8 @@
case RescheduleJob =>
freePool = freePool - sender()
busyPool = busyPool - sender()
+ case EmitMetrics =>
+ emitMetrics()
}
/** Creates a new container and updates state accordingly. */
@@ -316,6 +324,25 @@
def hasPoolSpaceFor[A](pool: Map[A, ContainerData], memory: ByteSize): Boolean = {
memoryConsumptionOf(pool) + memory.toMB <= poolConfig.userMemory.toMB
}
+
+ /**
+ * Log metrics about pool state (buffer size, buffer memory requirements, active number, active memory, prewarm number, prewarm memory)
+ */
+ private def emitMetrics() = {
+ MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_RUNBUFFER_COUNT, runBuffer.size)
+ MetricEmitter.emitGaugeMetric(
+ LoggingMarkers.CONTAINER_POOL_RUNBUFFER_SIZE,
+ runBuffer.map(_.action.limits.memory.megabytes).sum)
+ val containersInUse = freePool.filter(_._2.activeActivationCount > 0) ++ busyPool
+ MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_ACTIVE_COUNT, containersInUse.size)
+ MetricEmitter.emitGaugeMetric(
+ LoggingMarkers.CONTAINER_POOL_ACTIVE_SIZE,
+ containersInUse.map(_._2.memoryLimit.toMB).sum)
+ MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_COUNT, prewarmedPool.size)
+ MetricEmitter.emitGaugeMetric(
+ LoggingMarkers.CONTAINER_POOL_PREWARM_SIZE,
+ prewarmedPool.map(_._2.memoryLimit.toMB).sum)
+ }
}
object ContainerPool {