Fix race in controller/invoker active ack.
Add a trait to generate activation ids. This can be mixed into tests to have fixed ids where necessary.
Use custom activation id generator for testing blocking invokes.
Lift listener for active ack responses in action activations before the post to loadbalancer - this avoids a race where the response comes back before the listener becomes active.
Add unit test for active ack/fast path.
Also in this commit, removing deadcode:
- No longer using direct invoke API in tests.
- Load balancer requests are no longer happening over HTTP.
Fixes #1067.
Consolidate active ack/db poll promise completion in the Actions API handler.
Simplify the active ack timeout logic to that it is clearer what is happening.
Factored out timeout on active ack so that caller can decide how long it wants to wait for.
Signed off by Markus Thoemmes <markus.thoemmes@de.ibm.com>
diff --git a/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala b/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
index 7a9ba4b..216f5fc 100644
--- a/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -585,15 +585,6 @@
*/
def requiredProperties = Invoker.requiredProperties
- private class ServiceBuilder(invoker: Invoker)(
- implicit val ec: ExecutionContext)
- extends Creator[InvokerServer] {
- def create = new InvokerServer {
- override val invokerInstance = invoker
- override val executionContext = ec
- }
- }
-
def main(args: Array[String]): Unit = {
implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
implicit val system: ActorSystem = ActorSystem(
@@ -620,7 +611,9 @@
dispatcher.start()
val port = config.servicePort.toInt
- BasicHttpService.startService(system, "invoker", "0.0.0.0", port, new ServiceBuilder(invoker))
+ BasicHttpService.startService(system, "invoker", "0.0.0.0", port, new Creator[InvokerServer] {
+ def create = new InvokerServer {}
+ })
}
}
}
diff --git a/core/dispatcher/src/main/scala/whisk/core/invoker/InvokerServer.scala b/core/dispatcher/src/main/scala/whisk/core/invoker/InvokerServer.scala
index 35b8d09..2d06618 100644
--- a/core/dispatcher/src/main/scala/whisk/core/invoker/InvokerServer.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/invoker/InvokerServer.scala
@@ -17,30 +17,12 @@
package whisk.core.invoker
import akka.actor.Actor
-import spray.http.StatusCodes.OK
-import spray.http.StatusCodes.NotFound
-import spray.http.StatusCodes.InternalServerError
-import spray.httpx.SprayJsonSupport._
-import spray.json.DefaultJsonProtocol._
-import spray.json._
-import spray.json.JsString
-import spray.json.pimpAny
-import spray.routing.Directive.pimpApply
import whisk.common.Logging
-import whisk.common.TransactionId
-import whisk.core.connector.{ ActivationMessage => Message }
-import whisk.core.entity.ActivationId
-import whisk.core.entity.DocId
-import whisk.core.entity.Subject
import whisk.http.BasicRasService
-import scala.util.Try
-import scala.util.Failure
-import scala.util.Success
-import scala.concurrent.ExecutionContext
-import scala.language.postfixOps
/**
- * Implements web server to handle certain REST API calls for testing.
+ * Implements web server to handle certain REST API calls.
+ * Currently provides a health ping route, only.
*/
trait InvokerServer
extends BasicRasService
@@ -48,42 +30,4 @@
with Logging {
override def actorRefFactory = context
- override def routes(implicit transid: TransactionId) = super.routes ~ getContainer(transid) ~ invoke(transid)
- protected val invokerInstance: Invoker
-
- // this is used by wskadmin
- def getContainer(implicit transid: TransactionId) = {
- (get & path("api" / "getContainer" / """[\w-]+""".r)) {
- id =>
- complete {
- val activationId = Try { ActivationId(id) } toOption
- val container = activationId flatMap { invokerInstance.getContainerName(_) }
- container match {
- case Some(name) => (OK, JsObject("name" -> name.toJson))
- case None => (NotFound, JsObject("error" -> s"'$id' is not recognized".toJson))
- }
- }
- }
- }
-
- // this is used by wskadmin
- def invoke(transid: TransactionId) = {
- (post & path("api" / "invoke" / Rest) & entity(as[JsObject])) {
- (topic, body) =>
- Try { invokeImpl(topic, body, transid) } match {
- case Success(id) => complete(OK, id.toJsObject)
- case Failure(t) => complete(InternalServerError, JsObject("error" -> t.getMessage.toJson))
- }
- }
- }
-
- private def invokeImpl(topic: String, body: JsObject, transid: TransactionId) = {
- val JsString(subject) = body.fields("subject")
- val args = body.fields("args").asJsObject
- val msg = Message(transid, "", Subject(subject), ActivationId(), Some(args), None)
- invokerInstance.fetchFromStoreAndInvoke(DocId(topic).asDocInfo, msg)(transid) // asynchronous
- msg.activationId
- }
-
- protected[invoker] implicit val executionContext: ExecutionContext
}
diff --git a/tests/src/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/whisk/core/controller/test/ActionsApiTests.scala
index 402292d..eebd08e 100644
--- a/tests/src/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/whisk/core/controller/test/ActionsApiTests.scala
@@ -599,52 +599,95 @@
val action = WhiskAction(namespace, aname, Exec.js("??"), limits = ActionLimits(TimeLimit(1 second), MemoryLimit(), LogLimit()))
put(entityStore, action)
Post(s"$collectionPath/${action.name}?blocking=true") ~> sealRoute(routes(creds)) ~> check {
+ // status shold be accepted because there is no active ack response and
+ // db polling will fail since there is no record of the activation
status should be(Accepted)
val response = responseAs[JsObject]
response.fields("activationId") should not be None
}
}
- it should "invoke an action, blocking" in {
+ it should "invoke an action, blocking and retrieve result via db polling" in {
implicit val tid = transid()
val action = WhiskAction(namespace, aname, Exec.js("??"))
- val activation = WhiskActivation(action.namespace, action.name, creds.subject, activationId,
+ val activation = WhiskActivation(action.namespace, action.name, creds.subject, activationId.make(),
start = Instant.now,
end = Instant.now,
response = ActivationResponse.success(Some(JsObject("test" -> "yes".toJson))))
put(entityStore, action)
+ // storing the activation in the db will allow the db polling to retrieve it
+ // the test harness makes sure the activaiton id observed by the test matches
+ // the one generated by the api handler
put(activationStore, activation)
- Post(s"$collectionPath/${action.name}?blocking=true") ~> sealRoute(routes(creds)) ~> check {
- status should be(OK)
- val response = responseAs[JsObject]
- response should be(activation.toExtendedJson)
- }
+ try {
+ Post(s"$collectionPath/${action.name}?blocking=true") ~> sealRoute(routes(creds)) ~> check {
+ status should be(OK)
+ val response = responseAs[JsObject]
+ response should be(activation.toExtendedJson)
+ }
- // repeat invoke, get only result back
- Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~> sealRoute(routes(creds)) ~> check {
- status should be(OK)
- val response = responseAs[JsObject]
- response should be(activation.resultAsJson)
+ // repeat invoke, get only result back
+ Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~> sealRoute(routes(creds)) ~> check {
+ status should be(OK)
+ val response = responseAs[JsObject]
+ response should be(activation.resultAsJson)
+ }
+ } finally {
+ deleteActivation(activation.docid)
}
+ }
- deleteActivation(activation.docid)
+ it should "invoke an action, blocking and retrieve result via active ack" in {
+ implicit val tid = transid()
+ val action = WhiskAction(namespace, aname, Exec.js("??"))
+ val activation = WhiskActivation(action.namespace, action.name, creds.subject, activationId.make(),
+ start = Instant.now,
+ end = Instant.now,
+ response = ActivationResponse.success(Some(JsObject("test" -> "yes".toJson))))
+ put(entityStore, action)
+
+ try {
+ // do not store the activation in the db, instead register it as the response to generate on active ack
+ whiskActivationStub = Some(activation)
+
+ Post(s"$collectionPath/${action.name}?blocking=true") ~> sealRoute(routes(creds)) ~> check {
+ status should be(OK)
+ val response = responseAs[JsObject]
+ response should be(activation.toExtendedJson)
+ }
+
+ // repeat invoke, get only result back
+ Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~> sealRoute(routes(creds)) ~> check {
+ status should be(OK)
+ val response = responseAs[JsObject]
+ response should be(activation.resultAsJson)
+ }
+ } finally {
+ whiskActivationStub = None
+ }
}
it should "invoke a blocking action and return error response when activation fails" in {
implicit val tid = transid()
val action = WhiskAction(namespace, aname, Exec.js("??"))
- val activation = WhiskActivation(action.namespace, action.name, creds.subject, activationId,
+ val activation = WhiskActivation(action.namespace, action.name, creds.subject, activationId.make(),
start = Instant.now,
end = Instant.now,
response = ActivationResponse.whiskError("test"))
put(entityStore, action)
+ // storing the activation in the db will allow the db polling to retrieve it
+ // the test harness makes sure the activaiton id observed by the test matches
+ // the one generated by the api handler
put(activationStore, activation)
+ try {
Post(s"$collectionPath/${action.name}?blocking=true") ~> sealRoute(routes(creds)) ~> check {
status should be(InternalServerError)
val response = responseAs[JsObject]
response should be(activation.toExtendedJson)
}
-
- deleteActivation(activation.docid)
+ } finally {
+ deleteActivation(activation.docid)
+ }
}
+
}
diff --git a/tests/src/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/whisk/core/controller/test/ControllerTestCommon.scala
index 0fed91d..0a9c7f1 100644
--- a/tests/src/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/whisk/core/controller/test/ControllerTestCommon.scala
@@ -34,7 +34,6 @@
import whisk.common.TransactionCounter
import whisk.common.TransactionId
import whisk.core.WhiskConfig
-import whisk.core.connector.LoadBalancerResponse
import whisk.core.controller.WhiskActionsApi
import whisk.core.controller.WhiskServices
import whisk.core.database.test.DbUtils
@@ -42,6 +41,7 @@
import whisk.core.entitlement.EntitlementService
import whisk.core.entitlement.LocalEntitlementService
import whisk.core.entity.ActivationId
+import whisk.core.entity.ActivationId.ActivationIdGenerator
import whisk.core.entity.AuthKey
import whisk.core.entity.DocId
import whisk.core.entity.EntityName
@@ -55,6 +55,7 @@
import whisk.core.entity.WhiskPackage
import whisk.core.entity.WhiskRule
import whisk.core.entity.WhiskTrigger
+import scala.concurrent.duration.FiniteDuration
protected trait ControllerTestCommon
extends FlatSpec
@@ -74,22 +75,35 @@
implicit val actorSystem = system // defined in ScalatestRouteTest
val executionContext = actorSystem.dispatcher
- val whiskConfig = new WhiskConfig(WhiskActionsApi.requiredProperties)
+ override val whiskConfig = new WhiskConfig(WhiskActionsApi.requiredProperties)
assert(whiskConfig.isValid)
+ override val entitlementService: EntitlementService = new LocalEntitlementService(whiskConfig)
+
+ override val activationId = new ActivationIdGenerator() {
+ // need a static activation id to test activations api
+ private val fixedId = ActivationId()
+ override def make = fixedId
+ }
+
+ override val performLoadBalancerRequest = (lbr: WhiskServices.LoadBalancerReq) => Future.successful {}
+
+ override val queryActivationResponse = (activationId: ActivationId, timeout: FiniteDuration, transid: TransactionId) => {
+ whiskActivationStub map {
+ activation => Future.successful(activation)
+ } getOrElse (Future.failed {
+ new IllegalArgumentException("Unit test does not need fast path")
+ })
+ }
+
+ override val consulServer = "???"
+
val entityStore = WhiskEntityStore.datastore(whiskConfig)
val activationStore = WhiskActivationStore.datastore(whiskConfig)
val authStore = WhiskAuthStore.datastore(whiskConfig)
- val entitlementService: EntitlementService = new LocalEntitlementService(whiskConfig)
- val activationId = ActivationId() // need a static activation id to test activations api
- val performLoadBalancerRequest = (lbr: WhiskServices.LoadBalancerReq) => Future {
- LoadBalancerResponse.id(activationId)
- }
- val queryActivationResponse = (activationId: ActivationId, transid: TransactionId) => Future.failed {
- new IllegalArgumentException("Unit test does not need fast path")
- }
- val consulServer = "???"
+ // unit tests that need an activation via active ack/fast path should set this to value expected
+ protected var whiskActivationStub: Option[WhiskActivation] = None
def createTempCredentials(implicit transid: TransactionId) = {
val auth = WhiskAuth(Subject(), AuthKey())