Add a ticket based queue to ensure fairness in container acquisition
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala b/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala
index 3c0e4b2..c91e136 100644
--- a/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala
@@ -123,7 +123,14 @@
private val startingCounter = new Counter()
private var shuttingDown = false
- /**
+ /*
+ * Tracks requests for getting containers.
+ * The first value doled out via nextPosition.next() will be 1 and completedPosition.cur remains at 0 until completion.
+ */
+ private val nextPosition = new Counter()
+ private val completedPosition = new Counter()
+
+ /*
* Lists ALL containers at this docker point with "docker ps -a --no-trunc".
* This could include containers not in this pool at all.
*/
@@ -144,18 +151,25 @@
} else {
info(this, s"Getting container for ${action.fullyQualifiedName} with ${auth.uuid}")
val key = makeKey(action, auth)
- getImpl(key, { () => makeWhiskContainer(action, auth) }) map {
- case (c, initResult) =>
- val cacheMsg = if (!initResult.isDefined) "(Cache Hit)" else "(Cache Miss)"
- info(this, s"ContainerPool.getAction obtained container ${c.id} ${cacheMsg}")
- (c.asInstanceOf[WhiskContainer], initResult)
+ try {
+ getImpl(nextPosition.next(), key, { () => makeWhiskContainer(action, auth) }) map {
+ case (c, initResult) =>
+ val cacheMsg = if (!initResult.isDefined) "(Cache Hit)" else "(Cache Miss)"
+ info(this, s"ContainerPool.getAction obtained container ${c.id} ${cacheMsg}")
+ (c.asInstanceOf[WhiskContainer], initResult)
+ }
+ } finally {
+ completedPosition.next()
}
}
+ /*
+ * For testing
+ */
def getByImageName(imageName: String, args: Array[String])(implicit transid: TransactionId): Option[Container] = {
info(this, s"Getting container for image $imageName with args " + args.mkString(" "))
val key = makeKey(imageName, args)
- getImpl(key, { () => makeContainer(imageName, args) }) map { _._1 }
+ getImpl(0, key, { () => makeContainer(imageName, args) }) map { _._1 }
}
/**
@@ -163,8 +177,12 @@
* This method will apply retry so that the caller is blocked until retry succeeds.
*/
@tailrec
- final def getImpl(key: String, conMaker: () => ContainerResult)(implicit transid: TransactionId): Option[(Container, Option[RunResult])] = {
- getOrMake(key, conMaker) match {
+ final def getImpl(position: Int, key: String, conMaker: () => ContainerResult)(implicit transid: TransactionId): Option[(Container, Option[RunResult])] = {
+ val positionInLine = position - completedPosition.cur // this will be 1 if at the front of the line
+ val available = slack()
+ if (positionInLine > available) { // e.g. if there is 1 available, then I wait if I am second in line (positionInLine = 2)
+ Thread.sleep(50) // TODO: replace with wait/notify but tricky to get right because of desire for maximal concurrency
+ } else getOrMake(key, conMaker) match {
case Success(con, initResult) =>
info(this, s"Obtained container ${con.containerId.getOrElse("unknown")}")
return Some(con, initResult)
@@ -172,9 +190,9 @@
error(this, s"Error starting container: $str")
return None
case Busy() =>
- Thread.sleep(100)
- getImpl(key, conMaker)
+ // This will not cause a busy loop because only those that could be productive will get a chance
}
+ getImpl(position, key, conMaker)
}
def getNumberOfIdleContainers(key: String)(implicit transid: TransactionId): Int = {
@@ -183,8 +201,14 @@
}
}
- /**
- * Tries to get or create a container, returning None if there are too many
+ /*
+ * How many containers can we start? Someone could have fully started a container so we must include startingCounter.
+ * The use of a method rather than a getter is meant to signify the synchronization in the implementation.
+ */
+ private def slack() = _maxActive - (activeCount() + startingCounter.cur)
+
+ /*
+ * Try to get or create a container, returning None if there are too many
* active containers.
*
* The multiple synchronization block, and the use of startingCounter,
@@ -198,7 +222,7 @@
retrieve(key) match {
case CacheMiss() => {
this.synchronized {
- if (activeCount() + startingCounter.cur >= _maxActive) // Someone could have fully started a container
+ if (slack() <= 0)
return Busy()
if (startingCounter.cur >= 1) // Limit concurrent starting of containers
return Busy()