blob: 1645b887e1674d09da43c38e312ad304e4097dbf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.controller.test
import java.time.Instant
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import scala.language.postfixOps
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarshaller
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonUnmarshaller
import akka.http.scaladsl.server.Route
import spray.json._
import spray.json.DefaultJsonProtocol._
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.controller.WhiskActionsApi
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.http.Messages._
import scala.util.Success
/**
* Tests Conductor Actions API.
*
* Unit tests of the controller service as a standalone component.
* These tests exercise a fresh instance of the service object in memory -- these
* tests do NOT communication with a whisk deployment.
*/
@RunWith(classOf[JUnitRunner])
class ConductorsApiTests extends ControllerTestCommon with WhiskActionsApi {
/** Conductors API tests */
behavior of "Conductor"
val creds = WhiskAuthHelpers.newIdentity()
val namespace = EntityPath(creds.subject.asString)
val collectionPath = s"/${EntityPath.DEFAULT}/${collection.path}"
val alternateCreds = WhiskAuthHelpers.newIdentity()
val alternateNamespace = EntityPath(alternateCreds.subject.asString)
// test actions
val echo = MakeName.next("echo")
val conductor = MakeName.next("conductor")
val step = MakeName.next("step")
val missing = MakeName.next("missingAction") // undefined
val invalid = "invalid#Action" // invalid name
val testString = "this is a test"
val duration = 42
val limit = whiskConfig.actionSequenceLimit.toInt
override val loadBalancer = new FakeLoadBalancerService(whiskConfig)
override val activationIdFactory = new ActivationId.ActivationIdGenerator() {}
it should "invoke a conductor action with no dynamic steps" in {
implicit val tid = transid()
put(entityStore, WhiskAction(namespace, echo, jsDefault("??"), annotations = Parameters("conductor", "true")))
// a normal result
Post(s"$collectionPath/${echo}?blocking=true", JsObject("payload" -> testString.toJson)) ~> Route.seal(
routes(creds)) ~> check {
status should be(OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("result") shouldBe JsObject("payload" -> testString.toJson)
response.fields("duration") shouldBe duration.toJson
val annotations = response.fields("annotations").convertTo[Parameters]
annotations.getAs[Boolean]("conductor") shouldBe Success(true)
annotations.getAs[String]("kind") shouldBe Success("sequence")
annotations.getAs[Boolean]("topmost") shouldBe Success(true)
annotations.get("limits") should not be None
response.fields("logs").convertTo[JsArray].elements.size shouldBe 1
}
// an error result
Post(s"$collectionPath/${echo}?blocking=true", JsObject("error" -> testString.toJson)) ~> Route.seal(routes(creds)) ~> check {
status should not be (OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
response.fields("response").asJsObject.fields("result") shouldBe JsObject("error" -> testString.toJson)
response.fields("logs").convertTo[JsArray].elements.size shouldBe 1
}
// a wrapped result { params: result } is unwrapped by the controller
Post(s"$collectionPath/${echo}?blocking=true", JsObject("params" -> JsObject("payload" -> testString.toJson))) ~> Route
.seal(routes(creds)) ~> check {
status should be(OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("result") shouldBe JsObject("payload" -> testString.toJson)
response.fields("logs").convertTo[JsArray].elements.size shouldBe 1
}
// an invalid action name
Post(s"$collectionPath/${echo}?blocking=true", JsObject("payload" -> testString.toJson, "action" -> invalid.toJson)) ~> Route
.seal(routes(creds)) ~> check {
status should not be (OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
response.fields("response").asJsObject.fields("result") shouldBe JsObject(
"error" -> compositionComponentInvalid(invalid.toJson).toJson)
response.fields("logs").convertTo[JsArray].elements.size shouldBe 2
}
// an undefined action
Post(s"$collectionPath/${echo}?blocking=true", JsObject("payload" -> testString.toJson, "action" -> missing.toJson)) ~> Route
.seal(routes(creds)) ~> check {
status should not be (OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
response.fields("response").asJsObject.fields("result") shouldBe JsObject(
"error" -> compositionComponentNotFound(s"$namespace/$missing").toJson)
response.fields("logs").convertTo[JsArray].elements.size shouldBe 2
}
}
it should "invoke a conductor action with dynamic steps" in {
implicit val tid = transid()
put(entityStore, WhiskAction(namespace, conductor, jsDefault("??"), annotations = Parameters("conductor", "true")))
put(entityStore, WhiskAction(namespace, step, jsDefault("??")))
put(entityStore, WhiskAction(alternateNamespace, step, jsDefault("??"))) // forbidden action
val forbidden = s"/$alternateNamespace/$step" // forbidden action name
// dynamically invoke step action
Post(
s"$collectionPath/${conductor}?blocking=true",
JsObject("action" -> step.toJson, "params" -> JsObject("n" -> 1.toJson))) ~> Route.seal(routes(creds)) ~> check {
status should be(OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> 2.toJson)
response.fields("logs").convertTo[JsArray].elements.size shouldBe 3
response.fields("duration") shouldBe (3 * duration).toJson
}
// dynamically invoke step action with an error result
Post(s"$collectionPath/${conductor}?blocking=true", JsObject("action" -> step.toJson)) ~> Route.seal(routes(creds)) ~> check {
status should not be (OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
response.fields("response").asJsObject.fields("result") shouldBe JsObject("error" -> "missing parameter".toJson)
response.fields("logs").convertTo[JsArray].elements.size shouldBe 3
response.fields("duration") shouldBe (3 * duration).toJson
}
// dynamically invoke step action, forwarding state
Post(
s"$collectionPath/${conductor}?blocking=true",
JsObject("action" -> step.toJson, "state" -> JsObject("witness" -> 42.toJson), "n" -> 1.toJson)) ~> Route.seal(
routes(creds)) ~> check {
status should be(OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> 2.toJson, "witness" -> 42.toJson)
response.fields("logs").convertTo[JsArray].elements.size shouldBe 3
response.fields("duration") shouldBe (3 * duration).toJson
}
// dynamically invoke a forbidden action
Post(s"$collectionPath/${conductor}?blocking=true", JsObject("action" -> forbidden.toJson)) ~> Route.seal(
routes(creds)) ~> check {
status should not be (OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
response.fields("response").asJsObject.fields("result") shouldBe JsObject(
"error" -> compositionComponentNotAccessible(forbidden.drop(1)).toJson)
response.fields("logs").convertTo[JsArray].elements.size shouldBe 2
}
// dynamically invoke step action twice, forwarding state
Post(
s"$collectionPath/${conductor}?blocking=true",
JsObject("action" -> step.toJson, "state" -> JsObject("action" -> step.toJson), "n" -> 1.toJson)) ~> Route.seal(
routes(creds)) ~> check {
status should be(OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> 3.toJson)
response.fields("logs").convertTo[JsArray].elements.size shouldBe 5
response.fields("duration") shouldBe (5 * duration).toJson
}
// invoke nested conductor with single step
Post(
s"$collectionPath/${conductor}?blocking=true",
JsObject("action" -> conductor.toJson, "params" -> JsObject("action" -> step.toJson), "n" -> 1.toJson)) ~> Route
.seal(routes(creds)) ~> check {
status should be(OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> 2.toJson)
response.fields("logs").convertTo[JsArray].elements.size shouldBe 3
response.fields("duration") shouldBe (5 * duration).toJson
}
// nested step followed by outer step
Post(
s"$collectionPath/${conductor}?blocking=true",
JsObject(
"action" -> conductor.toJson,
"state" -> JsObject("action" -> step.toJson),
"params" -> JsObject("action" -> step.toJson),
"n" -> 1.toJson)) ~> Route.seal(routes(creds)) ~> check {
status should be(OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> 3.toJson)
response.fields("logs").convertTo[JsArray].elements.size shouldBe 5
response.fields("duration") shouldBe (7 * duration).toJson
}
// two levels of nesting, three steps
Post(
s"$collectionPath/${conductor}?blocking=true",
JsObject(
"action" -> conductor.toJson,
"state" -> JsObject("action" -> step.toJson),
"params" -> JsObject(
"action" -> conductor.toJson,
"state" -> JsObject("action" -> step.toJson),
"params" -> JsObject("action" -> step.toJson)),
"n" -> 1.toJson)) ~> Route.seal(routes(creds)) ~> check {
status should be(OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> 4.toJson)
response.fields("logs").convertTo[JsArray].elements.size shouldBe 5
response.fields("duration") shouldBe (11 * duration).toJson
}
}
it should "abort if composition is too long" in {
implicit val tid = transid()
put(entityStore, WhiskAction(namespace, conductor, jsDefault("??"), annotations = Parameters("conductor", "true")))
put(entityStore, WhiskAction(namespace, step, jsDefault("??")))
// stay just below limit
var params = Map[String, JsValue]()
for (i <- 1 to limit) {
params = Map("action" -> step.toJson, "state" -> JsObject(params))
}
Post(s"$collectionPath/${conductor}?blocking=true", JsObject(params + ("n" -> 0.toJson))) ~> Route.seal(
routes(creds)) ~> check {
status should be(OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> limit.toJson)
response.fields("duration") shouldBe (101 * duration).toJson
}
// add one extra step
Post(
s"$collectionPath/${conductor}?blocking=true",
JsObject("action" -> step.toJson, "state" -> JsObject(params), "n" -> 0.toJson)) ~> Route.seal(routes(creds)) ~> check {
status should not be (OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
response.fields("response").asJsObject.fields("result") shouldBe JsObject("error" -> compositionIsTooLong.toJson)
}
// nesting a composition at the limit should be ok
Post(
s"$collectionPath/${conductor}?blocking=true",
JsObject("action" -> conductor.toJson, "params" -> JsObject(params), "n" -> 0.toJson)) ~> Route.seal(
routes(creds)) ~> check {
status should be(OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> limit.toJson)
}
// nesting a composition beyond the limit should fail
Post(
s"$collectionPath/${conductor}?blocking=true",
JsObject(
"action" -> conductor.toJson,
"params" -> JsObject("action" -> step.toJson, "state" -> JsObject(params)),
"n" -> 0.toJson)) ~> Route.seal(routes(creds)) ~> check {
status should not be (OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
response.fields("response").asJsObject.fields("result") shouldBe JsObject("error" -> compositionIsTooLong.toJson)
}
// recursing at the limit should be ok
params = Map[String, JsValue]()
for (i <- 1 to limit) {
params = Map("action" -> conductor.toJson, "params" -> JsObject(params))
}
Post(s"$collectionPath/${conductor}?blocking=true", JsObject(params + ("n" -> 0.toJson))) ~> Route.seal(
routes(creds)) ~> check {
status should be(OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> 0.toJson)
}
// recursing beyond the limit should fail
Post(
s"$collectionPath/${conductor}?blocking=true",
JsObject("action" -> conductor.toJson, "params" -> JsObject(params), "n" -> 0.toJson)) ~> Route.seal(
routes(creds)) ~> check {
status should not be (OK)
val response = responseAs[JsObject]
response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
response.fields("response").asJsObject.fields("result") shouldBe JsObject("error" -> compositionIsTooLong.toJson)
}
}
// fake load balancer to emulate a handful of actions
class FakeLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionContext)
extends DegenerateLoadBalancerService(config) {
private def respond(action: ExecutableWhiskActionMetaData, msg: ActivationMessage, result: JsObject) = {
val response =
if (result.fields.get("error") isDefined) ActivationResponse(ActivationResponse.ApplicationError, Some(result))
else ActivationResponse.success(Some(result))
val start = Instant.now
WhiskActivation(
action.namespace,
action.name,
msg.user.subject,
msg.activationId,
start,
end = start.plusMillis(duration),
response = response)
}
override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] =
msg.content map { args =>
Future.successful {
action.name match {
case `echo` => // echo action
Future(Right(respond(action, msg, args)))
case `conductor` => // see tests/dat/actions/conductor.js
val result =
if (args.fields.get("error") isDefined) args
else {
val action = args.fields.get("action") map { action =>
Map("action" -> action)
} getOrElse Map.empty
val state = args.fields.get("state") map { state =>
Map("state" -> state)
} getOrElse Map.empty
val wrappedParams = args.fields.getOrElse("params", JsObject.empty).asJsObject.fields
val escapedParams = args.fields - "action" - "state" - "params"
val params = Map("params" -> JsObject(wrappedParams ++ escapedParams))
JsObject(params ++ action ++ state)
}
Future(Right(respond(action, msg, result)))
case `step` => // see tests/dat/actions/step.js
val result = args.fields.get("n") map { n =>
JsObject("n" -> (n.convertTo[BigDecimal] + 1).toJson)
} getOrElse {
JsObject("error" -> "missing parameter".toJson)
}
Future(Right(respond(action, msg, result)))
case _ =>
Future.failed(new IllegalArgumentException("Unknown action invoked in conductor test"))
}
}
} getOrElse Future.failed(new IllegalArgumentException("No invocation parameters in conductor test"))
}
}