Add a ticket based queue to ensure fairness in container acquisition
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala b/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala
index 3c0e4b2..c91e136 100644
--- a/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala
@@ -123,7 +123,14 @@
     private val startingCounter = new Counter()
     private var shuttingDown = false
 
-    /**
+    /*
+     * Tracks requests for getting containers.
+     * The first value doled out via nextPosition.next() will be 1 and completedPosition.cur remains at 0 until completion.
+     */
+    private val nextPosition = new Counter()
+    private val completedPosition = new Counter()
+
+    /*
      * Lists ALL containers at this docker point with "docker ps -a --no-trunc".
      * This could include containers not in this pool at all.
      */
@@ -144,18 +151,25 @@
         } else {
             info(this, s"Getting container for ${action.fullyQualifiedName} with ${auth.uuid}")
             val key = makeKey(action, auth)
-            getImpl(key, { () => makeWhiskContainer(action, auth) }) map {
-                case (c, initResult) =>
-                    val cacheMsg = if (!initResult.isDefined) "(Cache Hit)" else "(Cache Miss)"
-                    info(this, s"ContainerPool.getAction obtained container ${c.id} ${cacheMsg}")
-                    (c.asInstanceOf[WhiskContainer], initResult)
+            try {
+                getImpl(nextPosition.next(), key, { () => makeWhiskContainer(action, auth) }) map {
+                    case (c, initResult) =>
+                        val cacheMsg = if (!initResult.isDefined) "(Cache Hit)" else "(Cache Miss)"
+                        info(this, s"ContainerPool.getAction obtained container ${c.id} ${cacheMsg}")
+                        (c.asInstanceOf[WhiskContainer], initResult)
+                }
+            } finally {
+                completedPosition.next()
             }
         }
 
+    /*
+     * For testing
+     */
     def getByImageName(imageName: String, args: Array[String])(implicit transid: TransactionId): Option[Container] = {
         info(this, s"Getting container for image $imageName with args " + args.mkString(" "))
         val key = makeKey(imageName, args)
-        getImpl(key, { () => makeContainer(imageName, args) }) map { _._1 }
+        getImpl(0, key, { () => makeContainer(imageName, args) }) map { _._1 }
     }
 
     /**
@@ -163,8 +177,12 @@
      * This method will apply retry so that the caller is blocked until retry succeeds.
      */
     @tailrec
-    final def getImpl(key: String, conMaker: () => ContainerResult)(implicit transid: TransactionId): Option[(Container, Option[RunResult])] = {
-        getOrMake(key, conMaker) match {
+    final def getImpl(position: Int, key: String, conMaker: () => ContainerResult)(implicit transid: TransactionId): Option[(Container, Option[RunResult])] = {
+        val positionInLine = position - completedPosition.cur  // this will be 1 if at the front of the line
+        val available = slack()
+        if (positionInLine > available) {  // e.g. if there is 1 available, then I wait if I am second in line (positionInLine = 2)
+            Thread.sleep(50)               // TODO: replace with wait/notify but tricky to get right because of desire for maximal concurrency
+        } else getOrMake(key, conMaker) match {
             case Success(con, initResult) =>
                 info(this, s"Obtained container ${con.containerId.getOrElse("unknown")}")
                 return Some(con, initResult)
@@ -172,9 +190,9 @@
                 error(this, s"Error starting container: $str")
                 return None
             case Busy() =>
-                Thread.sleep(100)
-                getImpl(key, conMaker)
+                // This will not cause a busy loop because only those that could be productive will get a chance
         }
+        getImpl(position, key, conMaker)
     }
 
     def getNumberOfIdleContainers(key: String)(implicit transid: TransactionId): Int = {
@@ -183,8 +201,14 @@
         }
     }
 
-    /**
-     * Tries to get or create a container, returning None if there are too many
+    /*
+     * How many containers can we start?  Someone could have fully started a container so we must include startingCounter.
+     * The use of a method rather than a getter is meant to signify the synchronization in the implementation.
+     */
+    private def slack() = _maxActive - (activeCount() + startingCounter.cur)
+
+    /*
+     * Try to get or create a container, returning None if there are too many
      * active containers.
      *
      * The multiple synchronization block, and the use of startingCounter,
@@ -198,7 +222,7 @@
         retrieve(key) match {
             case CacheMiss() => {
                 this.synchronized {
-                    if (activeCount() + startingCounter.cur >= _maxActive) // Someone could have fully started a container
+                    if (slack() <= 0)
                         return Busy()
                     if (startingCounter.cur >= 1) // Limit concurrent starting of containers
                         return Busy()