Remove redundant Busy on container get. Clean up and extend logging. Warn on slow docker op.
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala b/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala
index caa2617..65628e2 100644
--- a/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala
@@ -28,6 +28,7 @@
import akka.actor.ActorSystem
import whisk.common.Counter
import whisk.common.Logging
+import whisk.common.TimingUtil
import whisk.common.TransactionId
import whisk.core.WhiskConfig
import whisk.core.WhiskConfig.dockerImageTag
@@ -118,7 +119,10 @@
def maxActive_=(value: Int): Unit = _maxActive = Math.max(0, value)
def resetMaxIdle() = _maxIdle = defaultMaxIdle
- def resetMaxActive() = _maxActive = ContainerPool.getDefaultMaxActive(config)
+ def resetMaxActive() = {
+ _maxActive = ContainerPool.getDefaultMaxActive(config)
+ info(this, s"maxActive set to ${_maxActive}")
+ }
def resetGCThreshold() = _gcThreshold = defaultGCThreshold
/*
@@ -162,12 +166,15 @@
} else {
try {
val myPos = nextPosition.next()
- info(this, s"Getting container for ${action.fullyQualifiedName} with ${auth.uuid}. myPos = $myPos comp = ${completedPosition.cur} slack = ${slack()}")
+ info(this, s"""Getting container for ${action.fullyQualifiedName} of kind ${action.exec.kind} with ${auth.uuid}:
+ | myPos = $myPos
+ | completed = ${completedPosition.cur}
+ | slack = ${slack()}
+ | startingCounter = ${startingCounter.cur}""".stripMargin)
val key = ActionContainerId(auth.uuid, action.fullyQualifiedName, action.rev)
getImpl(0, myPos, key, () => makeWhiskContainer(action, auth)) map {
case (c, initResult) =>
val cacheMsg = if (!initResult.isDefined) "(Cache Hit)" else "(Cache Miss)"
- info(this, s"getAction obtained container ${c.id} ${cacheMsg}")
(c.asInstanceOf[WhiskContainer], initResult)
}
} finally {
@@ -193,17 +200,17 @@
final def getImpl(tryCount: Int, position: Int, key: ActionContainerId, conMaker: () => FinalContainerResult)(implicit transid: TransactionId): Option[(Container, Option[RunResult])] = {
val positionInLine = position - completedPosition.cur // this will be 1 if at the front of the line
val available = slack()
+ if (tryCount == 100) {
+ warn(this, s"""getImpl possibly stuck because still in line:
+ | position = $position
+ | completed = ${completedPosition.cur}")
+ | slack = $available
+ | maxActive = ${_maxActive}
+ | activeCount = ${activeCount()}
+ | startingCounter = ${startingCounter.cur}""".stripMargin)
+ }
if (positionInLine > available) { // e.g. if there is 1 available, then I wait if I am second in line (positionInLine = 2)
Thread.sleep(50) // TODO: replace with wait/notify but tricky to get right because of desire for maximal concurrency
- if (tryCount == 200) {
- warn(this, s"""getImpl possibly stuck:
- | position = $position
- | completed = ${completedPosition.cur}")
- | slack = $available
- | maxActive = ${_maxActive}
- | activeCount = ${activeCount()}
- | startingCounter = ${startingCounter.cur}""".stripMargin)
- }
} else getOrMake(key, conMaker) match {
case Success(con, initResult) =>
info(this, s"Obtained container ${con.containerId.getOrElse("unknown")}")
@@ -243,13 +250,7 @@
def getOrMake(key: ActionContainerId, conMaker: () => FinalContainerResult)(implicit transid: TransactionId): FinalContainerResult = {
retrieve(key) match {
case CacheMiss => {
- this.synchronized {
- if (slack() <= 0)
- return Busy
- if (startingCounter.cur >= 1) // Limit concurrent starting of containers
- return Busy
- startingCounter.next()
- }
+ startingCounter.next()
try {
conMaker() match { /* We make the container outside synchronization */
// Unfortunately, variables are not allowed in pattern alternatives even when the types line up.
@@ -452,9 +453,15 @@
* All docker operations from the pool must pass through here (except for pull).
*/
private def runDockerOp[T](dockerOp: => T): T = {
- dockerLock.synchronized {
- dockerOp
+ val (elapsed, result) = TimingUtil.time {
+ dockerLock.synchronized {
+ dockerOp
+ }
}
+ if (elapsed > slowDockerThreshold) {
+ warn(this, s"Docker operation took $elapsed")
+ }
+ result
}
/**
@@ -575,13 +582,15 @@
private val defaultMaxIdle = 10
private val defaultGCThreshold = 600.seconds
+ private val slowDockerThreshold = 500.millis
val gcFrequency = 1000.milliseconds // this should not be leaked but a test needs this until GC count is implemented
private var _maxIdle = defaultMaxIdle
- private var _maxActive = ContainerPool.getDefaultMaxActive(config)
+ private var _maxActive = 0
private var _gcThreshold = defaultGCThreshold
private var gcOn = true
private val gcSync = new Object()
+ resetMaxActive()
private val timer = new Timer()
private val gcTask = new TimerTask {
diff --git a/core/dispatcher/src/main/scala/whisk/core/dispatcher/Dispatcher.scala b/core/dispatcher/src/main/scala/whisk/core/dispatcher/Dispatcher.scala
index 848dd5f..b5b7776 100644
--- a/core/dispatcher/src/main/scala/whisk/core/dispatcher/Dispatcher.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/dispatcher/Dispatcher.scala
@@ -112,17 +112,17 @@
if (matches.nonEmpty) Future {
val count = counter.next()
- info(this, s"activeCount = $count while handling ${rule.name}")
+ debug(this, s"activeCount = $count while handling ${rule.name}")
rule.doit(topic, msg, matches) // returns a future which is flat-mapped to hang onComplete
} flatMap (identity) onComplete {
- case Success(a) => info(this, s"activeCount = ${counter.prev()} after handling $rule")
+ case Success(a) => debug(this, s"activeCount = ${counter.prev()} after handling $rule")
case Failure(t) => error(this, s"activeCount = ${counter.prev()} ${errorMsg(rule, t)}")
}
}
private def inform(matchers: TrieMap[String, DispatchRule])(implicit transid: TransactionId) = {
val names = matchers map { _._2.name } reduce (_ + "," + _)
- info(this, s"matching message to ${matchers.size} handlers: $names")
+ debug(this, s"matching message to ${matchers.size} handlers: $names")
matchers
}