| /* |
| * Copyright 2015-2016 IBM Corporation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package system.basic |
| |
| import java.time.Instant |
| import java.util.Date |
| |
| import scala.concurrent.duration.DurationInt |
| import scala.language.postfixOps |
| import scala.util.matching.Regex |
| |
| import org.junit.runner.RunWith |
| import org.scalatest.junit.JUnitRunner |
| |
| import common.StreamLogging |
| import common.TestHelpers |
| import common.TestUtils |
| import common.TestUtils._ |
| import common.Wsk |
| import common.WskProps |
| import common.WskTestHelpers |
| import spray.json._ |
| import spray.json.DefaultJsonProtocol._ |
| import spray.testkit.ScalatestRouteTest |
| import whisk.core.WhiskConfig |
| import whisk.http.Messages.sequenceIsTooLong |
| |
| /** |
| * Tests sequence execution |
| */ |
| |
| @RunWith(classOf[JUnitRunner]) |
| class WskSequenceTests |
| extends TestHelpers |
| with ScalatestRouteTest |
| with WskTestHelpers |
| with StreamLogging { |
| |
| implicit val wskprops = WskProps() |
| val wsk = new Wsk |
| val allowedActionDuration = 120 seconds |
| val shortDuration = 10 seconds |
| |
| val whiskConfig = new WhiskConfig(Map(WhiskConfig.actionSequenceDefaultLimit -> null)) |
| assert(whiskConfig.isValid) |
| |
| behavior of "Wsk Sequence" |
| |
| it should "invoke a blocking sequence action and invoke the updated sequence with normal payload and payload with error field" in withAssetCleaner(wskprops) { |
| (wp, assetHelper) => |
| val name = "sequence" |
| val actions = Seq("split", "sort", "head", "cat") |
| for (actionName <- actions) { |
| val file = TestUtils.getTestActionFilename(s"$actionName.js") |
| assetHelper.withCleaner(wsk.action, actionName) { (action, _) => |
| action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration)) |
| } |
| } |
| |
| println(s"Sequence $actions") |
| assetHelper.withCleaner(wsk.action, name) { |
| val sequence = actions.mkString(",") |
| (action, _) => action.create(name, Some(sequence), kind = Some("sequence"), timeout = Some(allowedActionDuration)) |
| } |
| |
| val now = "it is now " + new Date() |
| val args = Array("what time is it?", now) |
| val run = wsk.action.invoke(name, Map("payload" -> args.mkString("\n").toJson)) |
| withActivation(wsk.activation, run, totalWait = 4 * allowedActionDuration) { |
| activation => |
| checkSequenceLogsAndAnnotations(activation, 4) // 4 activations in this sequence |
| activation.cause shouldBe None // topmost sequence |
| val result = activation.response.result.get |
| result.fields.get("payload") shouldBe defined |
| result.fields.get("length") should not be defined |
| result.fields.get("lines") shouldBe Some(JsArray(Vector(now.toJson))) |
| } |
| |
| // update action sequence and run it with normal payload |
| val newSequence = Seq("split", "sort").mkString(",") |
| println(s"Update sequence to $newSequence") |
| wsk.action.create(name, Some(newSequence), kind = Some("sequence"), timeout = Some(allowedActionDuration), update = true) |
| val secondrun = wsk.action.invoke(name, Map("payload" -> args.mkString("\n").toJson)) |
| withActivation(wsk.activation, secondrun, totalWait = 2 * allowedActionDuration) { |
| activation => |
| checkSequenceLogsAndAnnotations(activation, 2) // 2 activations in this sequence |
| val result = activation.response.result.get |
| result.fields.get("length") shouldBe Some(2.toJson) |
| result.fields.get("lines") shouldBe Some(args.sortWith(_.compareTo(_) < 0).toArray.toJson) |
| } |
| |
| println("Run sequence with error in payload") |
| // run sequence with error in the payload; nothing should run |
| val payload = Map("error" -> JsString("irrelevant error string")) |
| val thirdrun = wsk.action.invoke(name, payload) |
| withActivation(wsk.activation, thirdrun, totalWait = allowedActionDuration) { |
| activation => |
| activation.logs shouldBe defined |
| // no activations should have run |
| activation.logs.get.size shouldBe (0) |
| activation.response.success shouldBe (false) |
| // the status should be error |
| activation.response.status shouldBe ("application error") |
| val result = activation.response.result.get |
| // the result of the activation should be the payload |
| result shouldBe (JsObject(payload)) |
| |
| } |
| } |
| |
| /** |
| * s -> echo, x, echo |
| * x -> echo |
| * |
| * update x -> <limit-1> echo -- should work |
| * run s -> should stop after <limit> echo |
| */ |
| it should "create a sequence, run it, update one of the atomic actions to a sequence and stop executing the outer sequence when limit reached" in withAssetCleaner(wskprops) { |
| (wp, assetHelper) => |
| val xName = "xSequence" |
| val sName = "sSequence" |
| val echo = "echo" |
| |
| // create echo action |
| val file = TestUtils.getTestActionFilename(s"$echo.js") |
| assetHelper.withCleaner(wsk.action, echo) { (action, actionName) => |
| action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration)) |
| } |
| // create x |
| assetHelper.withCleaner(wsk.action, xName) { |
| (action, seqName) => action.create(seqName, Some(echo), kind = Some("sequence")) |
| } |
| // create s |
| assetHelper.withCleaner(wsk.action, sName) { |
| (action, seqName) => action.create(seqName, Some(s"$echo,$xName,$echo"), kind = Some("sequence")) |
| } |
| |
| // invoke s |
| val now = "it is now " + new Date() |
| val args = Array("what time is it?", now) |
| val argsJson = args.mkString("\n").toJson |
| val run = wsk.action.invoke(sName, Map("payload" -> argsJson)) |
| println(s"RUN: ${run.stdout}") |
| withActivation(wsk.activation, run, totalWait = 2 * allowedActionDuration) { |
| activation => |
| checkSequenceLogsAndAnnotations(activation, 3) // 3 activations in this sequence |
| val result = activation.response.result.get |
| result.fields.get("payload") shouldBe Some(argsJson) |
| } |
| // update x with limit echo |
| val limit = whiskConfig.actionSequenceLimit.toInt |
| val manyEcho = for (i <- 1 to limit) yield echo |
| |
| wsk.action.create(xName, Some(manyEcho.mkString(",")), kind = Some("sequence"), update = true) |
| |
| val updateRun = wsk.action.invoke(sName, Map("payload" -> argsJson)) |
| withActivation(wsk.activation, updateRun, totalWait = 2 * allowedActionDuration) { |
| activation => |
| activation.response.status shouldBe ("application error") |
| checkSequenceLogsAndAnnotations(activation, 2) |
| val result = activation.response.result.get |
| result.fields.get("error") shouldBe Some(JsString(sequenceIsTooLong)) |
| // check that inner sequence had only (limit - 1) activations |
| val innerSeq = activation.logs.get(1) // the id of the inner sequence activation |
| val getInnerSeq = wsk.activation.get(innerSeq) |
| withActivation(wsk.activation, getInnerSeq, totalWait = allowedActionDuration) { |
| innerSeqActivation => |
| innerSeqActivation.logs.get.size shouldBe (limit - 1) |
| innerSeqActivation.cause shouldBe defined |
| innerSeqActivation.cause.get shouldBe (activation.activationId) |
| } |
| } |
| } |
| |
| it should "invoke a blocking sequence action with an enclosing sequence action" in withAssetCleaner(wskprops) { |
| (wp, assetHelper) => |
| val inner_name = "inner_sequence" |
| val outer_name = "outer_sequence" |
| val inner_actions = Seq("sort", "head") |
| val actions = Seq("split") ++ inner_actions ++ Seq("cat") |
| // create atomic actions |
| for (actionName <- actions) { |
| val file = TestUtils.getTestActionFilename(s"$actionName.js") |
| assetHelper.withCleaner(wsk.action, actionName) { (action, _) => |
| action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration)) |
| } |
| } |
| |
| // create inner sequence |
| assetHelper.withCleaner(wsk.action, inner_name) { |
| val inner_sequence = inner_actions.mkString(",") |
| (action, _) => action.create(inner_name, Some(inner_sequence), kind = Some("sequence")) |
| } |
| |
| // create outer sequence |
| assetHelper.withCleaner(wsk.action, outer_name) { |
| val outer_sequence = Seq("split", "inner_sequence", "cat").mkString(",") |
| (action, _) => action.create(outer_name, Some(outer_sequence), kind = Some("sequence")) |
| } |
| |
| val now = "it is now " + new Date() |
| val args = Array("what time is it?", now) |
| val run = wsk.action.invoke(outer_name, Map("payload" -> args.mkString("\n").toJson)) |
| withActivation(wsk.activation, run, totalWait = 4 * allowedActionDuration) { |
| activation => |
| checkSequenceLogsAndAnnotations(activation, 3) // 3 activations in this sequence |
| activation.cause shouldBe None // topmost sequence |
| val result = activation.response.result.get |
| result.fields.get("payload") shouldBe defined |
| result.fields.get("length") should not be defined |
| result.fields.get("lines") shouldBe Some(JsArray(Vector(now.toJson))) |
| } |
| } |
| |
| it should "create and run a sequence in a package with parameters" in withAssetCleaner(wskprops) { |
| (wp, assetHelper) => |
| val sName = "sSequence" |
| |
| // create a package |
| val pkgName = "echopackage" |
| val pkgStr = "LonelyPackage" |
| assetHelper.withCleaner(wsk.pkg, pkgName) { |
| (pkg, name) => pkg.create(name, Map("payload" -> JsString(pkgStr))) |
| } |
| val helloName = "hello" |
| val helloWithPkg = s"$pkgName/$helloName" |
| |
| // create hello action in package |
| val file = TestUtils.getTestActionFilename(s"$helloName.js") |
| val actionStr = "AtomicAction" |
| assetHelper.withCleaner(wsk.action, helloWithPkg) { (action, actionName) => |
| action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration), parameters = Map("payload" -> JsString(actionStr))) |
| } |
| // create s |
| assetHelper.withCleaner(wsk.action, sName) { |
| (action, seqName) => action.create(seqName, Some(helloWithPkg), kind = Some("sequence")) |
| } |
| val run = wsk.action.invoke(sName) |
| // action params trump package params |
| checkLogsAtomicAction(0, run, new Regex(actionStr)) |
| // run with some parameters |
| val sequenceStr = "AlmightySequence" |
| val sequenceParamRun = wsk.action.invoke(sName, parameters = Map("payload" -> JsString(sequenceStr))) |
| // sequence param should be passed to the first atomic action and trump the action params |
| checkLogsAtomicAction(0, sequenceParamRun, new Regex(sequenceStr)) |
| // update action and remove the params by sending an unused param that overrides previous params |
| wsk.action.create(name = helloWithPkg, artifact = Some(file), timeout = Some(allowedActionDuration), parameters = Map("param" -> JsString("irrelevant")), update = true) |
| val sequenceParamSecondRun = wsk.action.invoke(sName, parameters = Map("payload" -> JsString(sequenceStr))) |
| // sequence param should be passed to the first atomic action and trump the package params |
| checkLogsAtomicAction(0, sequenceParamSecondRun, new Regex(sequenceStr)) |
| val pkgParamRun = wsk.action.invoke(sName) |
| // no sequence params, no atomic action params used, the pkg params should show up |
| checkLogsAtomicAction(0, pkgParamRun, new Regex(pkgStr)) |
| } |
| |
| it should "run a sequence with an action in a package binding with parameters" in withAssetCleaner(wskprops) { |
| (wp, assetHelper) => |
| val packageName = "package1" |
| val bindName = "package2" |
| val actionName = "print" |
| val packageActionName = packageName + "/" + actionName |
| val bindActionName = bindName + "/" + actionName |
| val packageParams = Map("key1a" -> "value1a".toJson, "key1b" -> "value1b".toJson) |
| val bindParams = Map("key2a" -> "value2a".toJson, "key1b" -> "value2b".toJson) |
| val actionParams = Map("key0" -> "value0".toJson) |
| val file = TestUtils.getTestActionFilename("printParams.js") |
| assetHelper.withCleaner(wsk.pkg, packageName) { (pkg, _) => |
| pkg.create(packageName, packageParams) |
| } |
| assetHelper.withCleaner(wsk.action, packageActionName) { (action, _) => |
| action.create(packageActionName, Some(file), parameters = actionParams) |
| } |
| assetHelper.withCleaner(wsk.pkg, bindName) { (pkg, _) => |
| pkg.bind(packageName, bindName, bindParams) |
| } |
| // sequence |
| val sName = "sequenceWithBindingParams" |
| assetHelper.withCleaner(wsk.action, sName) { |
| (action, seqName) => action.create(seqName, Some(bindActionName), kind = Some("sequence")) |
| } |
| // Check that inherited parameters are passed to the action. |
| val now = new Date().toString() |
| val run = wsk.action.invoke(sName, Map("payload" -> now.toJson)) |
| // action params trump package params |
| checkLogsAtomicAction(0, run, new Regex(String.format(".*key0: value0.*key1a: value1a.*key1b: value2b.*key2a: value2a.*payload: %s", now))) |
| } |
| /** |
| * s -> apperror, echo |
| * only apperror should run |
| */ |
| it should "stop execution of a sequence (with no payload) on error" in withAssetCleaner(wskprops) { |
| (wp, assetHelper) => |
| val sName = "sSequence" |
| val apperror = "applicationError" |
| val echo = "echo" |
| |
| // create actions |
| val actions = Seq(apperror, echo) |
| for (actionName <- actions) { |
| val file = TestUtils.getTestActionFilename(s"$actionName.js") |
| assetHelper.withCleaner(wsk.action, actionName) { (action, actionName) => |
| action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration)) |
| } |
| } |
| // create sequence s |
| assetHelper.withCleaner(wsk.action, sName) { |
| (action, seqName) => action.create(seqName, artifact = Some(actions.mkString(",")), kind = Some("sequence")) |
| } |
| // run sequence s with no payload |
| val run = wsk.action.invoke(sName) |
| withActivation(wsk.activation, run, totalWait = 2 * allowedActionDuration) { |
| activation => |
| checkSequenceLogsAndAnnotations(activation, 1) // only the first action should have run |
| activation.response.success shouldBe (false) |
| // the status should be error |
| activation.response.status shouldBe ("application error") |
| val result = activation.response.result.get |
| // the result of the activation should be the application error |
| result shouldBe (JsObject("error" -> JsString("This error thrown on purpose by the action."))) |
| } |
| } |
| |
| /** |
| * s -> echo, initforever |
| * should run both, but error |
| */ |
| it should "propagate execution error (timeout) from atomic action to sequence" in withAssetCleaner(wskprops) { |
| (wp, assetHelper) => |
| val sName = "sSequence" |
| val initforever = "initforever" |
| val echo = "echo" |
| |
| // create actions |
| val actions = Seq(echo, initforever) |
| // timeouts for the action; make the one for initforever short |
| val timeout = Map(echo -> allowedActionDuration, initforever -> shortDuration) |
| for (actionName <- actions) { |
| val file = TestUtils.getTestActionFilename(s"$actionName.js") |
| assetHelper.withCleaner(wsk.action, actionName) { (action, actionName) => |
| action.create(name = actionName, artifact = Some(file), timeout = Some(timeout(actionName))) |
| } |
| } |
| // create sequence s |
| assetHelper.withCleaner(wsk.action, sName) { |
| (action, seqName) => action.create(seqName, artifact = Some(actions.mkString(",")), kind = Some("sequence")) |
| } |
| // run sequence s with no payload |
| val run = wsk.action.invoke(sName) |
| withActivation(wsk.activation, run, totalWait = 2 * allowedActionDuration) { |
| activation => |
| checkSequenceLogsAndAnnotations(activation, 2) // 2 actions |
| activation.response.success shouldBe (false) |
| // the status should be error |
| //activation.response.status shouldBe("application error") |
| val result = activation.response.result.get |
| // the result of the activation should be timeout |
| result shouldBe (JsObject("error" -> JsString("The action exceeded its time limits of 10000 milliseconds during initialization."))) |
| } |
| } |
| |
| /** |
| * s -> echo, sleep |
| * sleep sleeps for 90s, timeout set at 120s |
| * should run both, the blocking call should be transformed into a non-blocking call, but finish executing |
| */ |
| it should "execute a sequence in blocking fashion and finish execution even if longer than blocking response timeout" in withAssetCleaner(wskprops) { |
| (wp, assetHelper) => |
| val sName = "sSequence" |
| val sleep = "timeout" |
| val echo = "echo" |
| |
| // create actions |
| val actions = Seq(echo, sleep) |
| for (actionName <- actions) { |
| val file = TestUtils.getTestActionFilename(s"$actionName.js") |
| assetHelper.withCleaner(wsk.action, actionName) { (action, actionName) => |
| action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration)) |
| } |
| } |
| // create sequence s |
| assetHelper.withCleaner(wsk.action, sName) { |
| (action, seqName) => action.create(seqName, artifact = Some(actions.mkString(",")), kind = Some("sequence")) |
| } |
| // run sequence s with sleep equal to payload |
| val payload = 65000 |
| val run = wsk.action.invoke(sName, parameters = Map("payload" -> JsNumber(payload)), blocking = true) |
| withActivation(wsk.activation, run, initialWait = 5 seconds, totalWait = 3 * allowedActionDuration) { |
| activation => |
| checkSequenceLogsAndAnnotations(activation, 2) // 2 actions |
| activation.response.success shouldBe (true) |
| // the status should be error |
| //activation.response.status shouldBe("application error") |
| val result = activation.response.result.get |
| // the result of the activation should be timeout |
| result shouldBe (JsObject("msg" -> JsString(s"[OK] message terminated successfully after $payload milliseconds."))) |
| } |
| } |
| |
| /** |
| * sequence s -> echo |
| * t trigger with payload |
| * rule r: t -> s |
| */ |
| it should "execute a sequence that is part of a rule and pass the trigger parameters to the sequence" in withAssetCleaner(wskprops) { |
| (wp, assetHelper) => |
| val seqName = "seqRule" |
| val actionName = "echo" |
| val triggerName = "trigSeq" |
| val ruleName = "ruleSeq" |
| |
| val itIsNow = "it is now " + new Date() |
| // set up all entities |
| // trigger |
| val triggerPayload: Map[String, JsValue] = Map("payload" -> JsString(itIsNow)) |
| assetHelper.withCleaner(wsk.trigger, triggerName) { |
| (trigger, name) => trigger.create(name, parameters = triggerPayload) |
| } |
| // action |
| val file = TestUtils.getTestActionFilename(s"$actionName.js") |
| assetHelper.withCleaner(wsk.action, actionName) { (action, actionName) => |
| action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration)) |
| } |
| // sequence |
| assetHelper.withCleaner(wsk.action, seqName) { |
| (action, seqName) => action.create(seqName, artifact = Some(actionName), kind = Some("sequence")) |
| } |
| // rule |
| assetHelper.withCleaner(wsk.rule, ruleName) { |
| (rule, name) => rule.create(name, triggerName, seqName) |
| } |
| // fire trigger |
| val run = wsk.trigger.fire(triggerName) |
| // check that the sequence was invoked and that the echo action produced the expected result |
| checkEchoSeqRuleResult(run, seqName, JsObject(triggerPayload)) |
| // fire trigger with new payload |
| val now = "this is now: " + Instant.now |
| val newPayload = Map("payload" -> JsString(now)) |
| val newRun = wsk.trigger.fire(triggerName, newPayload) |
| checkEchoSeqRuleResult(newRun, seqName, JsObject(newPayload)) |
| } |
| |
| /** |
| * checks the result of an echo sequence connected to a trigger through a rule |
| * @param triggerFireRun the run result of firing the trigger |
| * @param seqName the sequence name |
| * @param triggerPayload the payload used for the trigger (that should be reflected in the sequence result) |
| */ |
| private def checkEchoSeqRuleResult(triggerFireRun: RunResult, seqName: String, triggerPayload: JsObject) = { |
| withActivation(wsk.activation, triggerFireRun) { |
| triggerActivation => |
| withActivationsFromEntity(wsk.activation, seqName, since = Some(triggerActivation.start)) { activationList => |
| activationList.head.response.result shouldBe Some(triggerPayload) |
| activationList.head.cause shouldBe None |
| } |
| } |
| } |
| |
| /** |
| * checks logs for the activation of a sequence (length/size and ids) |
| * checks that the cause field for composing atomic actions is set properly |
| * checks duration |
| * checks memory |
| */ |
| private def checkSequenceLogsAndAnnotations(activation: CliActivation, size: Int) = { |
| activation.logs shouldBe defined |
| // check that the logs are what they are supposed to be (activation ids) |
| // check that the cause field is properly set for these activations |
| activation.logs.get.size shouldBe (size) // the number of activations in this sequence |
| var totalTime: Long = 0 |
| var maxMemory: Long = 0 |
| for (id <- activation.logs.get) { |
| withActivation(wsk.activation, id, initialWait = 1 second, pollPeriod = 60 seconds, totalWait = allowedActionDuration) { |
| componentActivation => |
| componentActivation.cause shouldBe defined |
| componentActivation.cause.get shouldBe (activation.activationId) |
| // check causedBy |
| val causedBy = componentActivation.getAnnotationValue("causedBy") |
| causedBy shouldBe defined |
| causedBy.get shouldBe (JsString("sequence")) |
| totalTime += componentActivation.duration |
| // extract memory |
| val mem = extractMemoryAnnotation(componentActivation) |
| maxMemory = maxMemory max mem |
| } |
| } |
| // extract duration |
| activation.duration shouldBe (totalTime) |
| // extract memory |
| activation.annotations shouldBe defined |
| val memory = extractMemoryAnnotation(activation) |
| memory shouldBe (maxMemory) |
| } |
| |
| /** checks that the logs of the idx-th atomic action from a sequence contains logsStr */ |
| private def checkLogsAtomicAction(atomicActionIdx: Int, run: RunResult, regex: Regex) { |
| withActivation(wsk.activation, run, totalWait = 2 * allowedActionDuration) { activation => |
| checkSequenceLogsAndAnnotations(activation, 1) |
| val componentId = activation.logs.get(atomicActionIdx) |
| val getComponentActivation = wsk.activation.get(componentId) |
| withActivation(wsk.activation, getComponentActivation, totalWait = allowedActionDuration) { componentActivation => |
| println(componentActivation) |
| componentActivation.logs shouldBe defined |
| val logs = componentActivation.logs.get.mkString(" ") |
| regex.findFirstIn(logs) shouldBe defined |
| } |
| } |
| } |
| |
| private def extractMemoryAnnotation(activation: CliActivation): Long = { |
| val limits = activation.getAnnotationValue("limits") |
| limits shouldBe defined |
| limits.get.asJsObject.getFields("memory")(0).convertTo[Long] |
| } |
| } |