Fix race in controller/invoker active ack.

Add a trait to generate activation ids. This can be mixed into tests to have fixed ids where necessary.
Use custom activation id generator for testing blocking invokes.
Lift listener for active ack responses in action activations before the post to loadbalancer - this avoids a race where the response comes back before the listener becomes active.
Add unit test for active ack/fast path.

Also in this commit, removing deadcode:
- No longer using direct invoke API in tests.
- Load balancer requests are no longer happening over HTTP.

Fixes #1067.

Consolidate active ack/db poll promise completion in the Actions API handler.
Simplify the active ack timeout logic to that it is clearer what is happening.
Factored out timeout on active ack so that caller can decide how long it wants to wait for.

Signed off by Markus Thoemmes <markus.thoemmes@de.ibm.com>
diff --git a/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala b/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
index 7a9ba4b..216f5fc 100644
--- a/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -585,15 +585,6 @@
      */
     def requiredProperties = Invoker.requiredProperties
 
-    private class ServiceBuilder(invoker: Invoker)(
-        implicit val ec: ExecutionContext)
-        extends Creator[InvokerServer] {
-        def create = new InvokerServer {
-            override val invokerInstance = invoker
-            override val executionContext = ec
-        }
-    }
-
     def main(args: Array[String]): Unit = {
         implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
         implicit val system: ActorSystem = ActorSystem(
@@ -620,7 +611,9 @@
             dispatcher.start()
 
             val port = config.servicePort.toInt
-            BasicHttpService.startService(system, "invoker", "0.0.0.0", port, new ServiceBuilder(invoker))
+            BasicHttpService.startService(system, "invoker", "0.0.0.0", port, new Creator[InvokerServer] {
+                def create = new InvokerServer {}
+            })
         }
     }
 }
diff --git a/core/dispatcher/src/main/scala/whisk/core/invoker/InvokerServer.scala b/core/dispatcher/src/main/scala/whisk/core/invoker/InvokerServer.scala
index 35b8d09..2d06618 100644
--- a/core/dispatcher/src/main/scala/whisk/core/invoker/InvokerServer.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/invoker/InvokerServer.scala
@@ -17,30 +17,12 @@
 package whisk.core.invoker
 
 import akka.actor.Actor
-import spray.http.StatusCodes.OK
-import spray.http.StatusCodes.NotFound
-import spray.http.StatusCodes.InternalServerError
-import spray.httpx.SprayJsonSupport._
-import spray.json.DefaultJsonProtocol._
-import spray.json._
-import spray.json.JsString
-import spray.json.pimpAny
-import spray.routing.Directive.pimpApply
 import whisk.common.Logging
-import whisk.common.TransactionId
-import whisk.core.connector.{ ActivationMessage => Message }
-import whisk.core.entity.ActivationId
-import whisk.core.entity.DocId
-import whisk.core.entity.Subject
 import whisk.http.BasicRasService
-import scala.util.Try
-import scala.util.Failure
-import scala.util.Success
-import scala.concurrent.ExecutionContext
-import scala.language.postfixOps
 
 /**
- * Implements web server to handle certain REST API calls for testing.
+ * Implements web server to handle certain REST API calls.
+ * Currently provides a health ping route, only.
  */
 trait InvokerServer
     extends BasicRasService
@@ -48,42 +30,4 @@
     with Logging {
 
     override def actorRefFactory = context
-    override def routes(implicit transid: TransactionId) = super.routes ~ getContainer(transid) ~ invoke(transid)
-    protected val invokerInstance: Invoker
-
-    // this is used by wskadmin
-    def getContainer(implicit transid: TransactionId) = {
-        (get & path("api" / "getContainer" / """[\w-]+""".r)) {
-            id =>
-                complete {
-                    val activationId = Try { ActivationId(id) } toOption
-                    val container = activationId flatMap { invokerInstance.getContainerName(_) }
-                    container match {
-                        case Some(name) => (OK, JsObject("name" -> name.toJson))
-                        case None       => (NotFound, JsObject("error" -> s"'$id' is not recognized".toJson))
-                    }
-                }
-        }
-    }
-
-    // this is used by wskadmin
-    def invoke(transid: TransactionId) = {
-        (post & path("api" / "invoke" / Rest) & entity(as[JsObject])) {
-            (topic, body) =>
-                Try { invokeImpl(topic, body, transid) } match {
-                    case Success(id) => complete(OK, id.toJsObject)
-                    case Failure(t)  => complete(InternalServerError, JsObject("error" -> t.getMessage.toJson))
-                }
-        }
-    }
-
-    private def invokeImpl(topic: String, body: JsObject, transid: TransactionId) = {
-        val JsString(subject) = body.fields("subject")
-        val args = body.fields("args").asJsObject
-        val msg = Message(transid, "", Subject(subject), ActivationId(), Some(args), None)
-        invokerInstance.fetchFromStoreAndInvoke(DocId(topic).asDocInfo, msg)(transid) // asynchronous
-        msg.activationId
-    }
-
-    protected[invoker] implicit val executionContext: ExecutionContext
 }
diff --git a/tests/src/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/whisk/core/controller/test/ActionsApiTests.scala
index 402292d..eebd08e 100644
--- a/tests/src/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/whisk/core/controller/test/ActionsApiTests.scala
@@ -599,52 +599,95 @@
         val action = WhiskAction(namespace, aname, Exec.js("??"), limits = ActionLimits(TimeLimit(1 second), MemoryLimit(), LogLimit()))
         put(entityStore, action)
         Post(s"$collectionPath/${action.name}?blocking=true") ~> sealRoute(routes(creds)) ~> check {
+            // status shold be accepted because there is no active ack response and
+            // db polling will fail since there is no record of the activation
             status should be(Accepted)
             val response = responseAs[JsObject]
             response.fields("activationId") should not be None
         }
     }
 
-    it should "invoke an action, blocking" in {
+    it should "invoke an action, blocking and retrieve result via db polling" in {
         implicit val tid = transid()
         val action = WhiskAction(namespace, aname, Exec.js("??"))
-        val activation = WhiskActivation(action.namespace, action.name, creds.subject, activationId,
+        val activation = WhiskActivation(action.namespace, action.name, creds.subject, activationId.make(),
             start = Instant.now,
             end = Instant.now,
             response = ActivationResponse.success(Some(JsObject("test" -> "yes".toJson))))
         put(entityStore, action)
+        // storing the activation in the db will allow the db polling to retrieve it
+        // the test harness makes sure the activaiton id observed by the test matches
+        // the one generated by the api handler
         put(activationStore, activation)
-        Post(s"$collectionPath/${action.name}?blocking=true") ~> sealRoute(routes(creds)) ~> check {
-            status should be(OK)
-            val response = responseAs[JsObject]
-            response should be(activation.toExtendedJson)
-        }
+        try {
+            Post(s"$collectionPath/${action.name}?blocking=true") ~> sealRoute(routes(creds)) ~> check {
+                status should be(OK)
+                val response = responseAs[JsObject]
+                response should be(activation.toExtendedJson)
+            }
 
-        // repeat invoke, get only result back
-        Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~> sealRoute(routes(creds)) ~> check {
-            status should be(OK)
-            val response = responseAs[JsObject]
-            response should be(activation.resultAsJson)
+            // repeat invoke, get only result back
+            Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~> sealRoute(routes(creds)) ~> check {
+                status should be(OK)
+                val response = responseAs[JsObject]
+                response should be(activation.resultAsJson)
+            }
+        } finally {
+            deleteActivation(activation.docid)
         }
+    }
 
-        deleteActivation(activation.docid)
+    it should "invoke an action, blocking and retrieve result via active ack" in {
+        implicit val tid = transid()
+        val action = WhiskAction(namespace, aname, Exec.js("??"))
+        val activation = WhiskActivation(action.namespace, action.name, creds.subject, activationId.make(),
+            start = Instant.now,
+            end = Instant.now,
+            response = ActivationResponse.success(Some(JsObject("test" -> "yes".toJson))))
+        put(entityStore, action)
+
+        try {
+            // do not store the activation in the db, instead register it as the response to generate on active ack
+            whiskActivationStub = Some(activation)
+
+            Post(s"$collectionPath/${action.name}?blocking=true") ~> sealRoute(routes(creds)) ~> check {
+                status should be(OK)
+                val response = responseAs[JsObject]
+                response should be(activation.toExtendedJson)
+            }
+
+            // repeat invoke, get only result back
+            Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~> sealRoute(routes(creds)) ~> check {
+                status should be(OK)
+                val response = responseAs[JsObject]
+                response should be(activation.resultAsJson)
+            }
+        } finally {
+            whiskActivationStub = None
+        }
     }
 
     it should "invoke a blocking action and return error response when activation fails" in {
         implicit val tid = transid()
         val action = WhiskAction(namespace, aname, Exec.js("??"))
-        val activation = WhiskActivation(action.namespace, action.name, creds.subject, activationId,
+        val activation = WhiskActivation(action.namespace, action.name, creds.subject, activationId.make(),
             start = Instant.now,
             end = Instant.now,
             response = ActivationResponse.whiskError("test"))
         put(entityStore, action)
+        // storing the activation in the db will allow the db polling to retrieve it
+        // the test harness makes sure the activaiton id observed by the test matches
+        // the one generated by the api handler
         put(activationStore, activation)
+        try {
         Post(s"$collectionPath/${action.name}?blocking=true") ~> sealRoute(routes(creds)) ~> check {
             status should be(InternalServerError)
             val response = responseAs[JsObject]
             response should be(activation.toExtendedJson)
         }
-
-        deleteActivation(activation.docid)
+        } finally {
+            deleteActivation(activation.docid)
+        }
     }
+
 }
diff --git a/tests/src/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/whisk/core/controller/test/ControllerTestCommon.scala
index 0fed91d..0a9c7f1 100644
--- a/tests/src/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/whisk/core/controller/test/ControllerTestCommon.scala
@@ -34,7 +34,6 @@
 import whisk.common.TransactionCounter
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
-import whisk.core.connector.LoadBalancerResponse
 import whisk.core.controller.WhiskActionsApi
 import whisk.core.controller.WhiskServices
 import whisk.core.database.test.DbUtils
@@ -42,6 +41,7 @@
 import whisk.core.entitlement.EntitlementService
 import whisk.core.entitlement.LocalEntitlementService
 import whisk.core.entity.ActivationId
+import whisk.core.entity.ActivationId.ActivationIdGenerator
 import whisk.core.entity.AuthKey
 import whisk.core.entity.DocId
 import whisk.core.entity.EntityName
@@ -55,6 +55,7 @@
 import whisk.core.entity.WhiskPackage
 import whisk.core.entity.WhiskRule
 import whisk.core.entity.WhiskTrigger
+import scala.concurrent.duration.FiniteDuration
 
 protected trait ControllerTestCommon
     extends FlatSpec
@@ -74,22 +75,35 @@
     implicit val actorSystem = system // defined in ScalatestRouteTest
     val executionContext = actorSystem.dispatcher
 
-    val whiskConfig = new WhiskConfig(WhiskActionsApi.requiredProperties)
+    override val whiskConfig = new WhiskConfig(WhiskActionsApi.requiredProperties)
     assert(whiskConfig.isValid)
 
+    override val entitlementService: EntitlementService = new LocalEntitlementService(whiskConfig)
+
+    override val activationId = new ActivationIdGenerator() {
+        // need a static activation id to test activations api
+        private val fixedId = ActivationId()
+        override def make = fixedId
+    }
+
+    override val performLoadBalancerRequest = (lbr: WhiskServices.LoadBalancerReq) => Future.successful {}
+
+    override val queryActivationResponse = (activationId: ActivationId, timeout: FiniteDuration, transid: TransactionId) => {
+        whiskActivationStub map {
+            activation => Future.successful(activation)
+        } getOrElse (Future.failed {
+            new IllegalArgumentException("Unit test does not need fast path")
+        })
+    }
+
+    override val consulServer = "???"
+
     val entityStore = WhiskEntityStore.datastore(whiskConfig)
     val activationStore = WhiskActivationStore.datastore(whiskConfig)
     val authStore = WhiskAuthStore.datastore(whiskConfig)
-    val entitlementService: EntitlementService = new LocalEntitlementService(whiskConfig)
 
-    val activationId = ActivationId() // need a static activation id to test activations api
-    val performLoadBalancerRequest = (lbr: WhiskServices.LoadBalancerReq) => Future {
-        LoadBalancerResponse.id(activationId)
-    }
-    val queryActivationResponse = (activationId: ActivationId, transid: TransactionId) => Future.failed {
-        new IllegalArgumentException("Unit test does not need fast path")
-    }
-    val consulServer = "???"
+    // unit tests that need an activation via active ack/fast path should set this to value expected
+    protected var whiskActivationStub: Option[WhiskActivation] = None
 
     def createTempCredentials(implicit transid: TransactionId) = {
         val auth = WhiskAuth(Subject(), AuthKey())