Remove unnecessary sleeps in invoker.
- a 3s sleep before calling /init for black box containers is not necessary since the HTTP call will retry.
- slack sleeps after init also not necessary since if container respond to inits, it's ready to run.
- sleep during nio channel draining should not be necessary.
Add tests using blackbox actions.
Tests that blackbox containers that misbehave will timeout with expected exception.
Three cases are tested:
1. blackbox that doesn't implement a proper proxy (doesn't run a proxy at all)
2. blackbox that doesn't respond to init (implements /init but does not respond)
3. blackbox that doesn't respond to run (implements /run but does not respond)
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala b/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala
index 99b022e..dd985a8 100644
--- a/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala
@@ -499,7 +499,7 @@
val key = ActionContainerId(auth.uuid, action.fullyQualifiedName, action.rev)
val warmedContainer = if (limits.memory == defaultMemoryLimit && imageName == nodeImageName) getWarmNodejsContainer(key) else None
val containerName = makeContainerName(action)
- val con = warmedContainer getOrElse makeGeneralContainer(key, containerName, imageName, limits, action.exec.isInstanceOf[BlackBoxExec])
+ val con = warmedContainer getOrElse makeGeneralContainer(key, containerName, imageName, limits, action.exec.kind == BlackBoxExec)
initWhiskContainer(action, con)
}
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/ContainerUtils.scala b/core/dispatcher/src/main/scala/whisk/core/container/ContainerUtils.scala
index a842bf6..471eac8 100644
--- a/core/dispatcher/src/main/scala/whisk/core/container/ContainerUtils.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/container/ContainerUtils.scala
@@ -143,7 +143,6 @@
val read = channel.read(buffer)
if (read > 0)
remain = read - read.toInt
- Thread.sleep(50) // TODO What is this for?
}
buffer.array
} catch {
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/WhiskContainer.scala b/core/dispatcher/src/main/scala/whisk/core/container/WhiskContainer.scala
index e384ef4..0b4e03a 100644
--- a/core/dispatcher/src/main/scala/whisk/core/container/WhiskContainer.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/container/WhiskContainer.scala
@@ -79,8 +79,6 @@
* Sends initialization payload to container.
*/
def init(args: JsObject)(implicit system: ActorSystem, transid: TransactionId): RunResult = {
- // this shouldn't be needed but leave it for now
- if (isBlackbox) Thread.sleep(3000)
info(this, s"sending initialization to ${this.details}")
// when invoking /init, don't wait longer than the timeout configured for this action
val timeout = initTimeoutMilli min limits.timeout.duration
diff --git a/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala b/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
index 76254ce..3fc7b59 100644
--- a/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -59,8 +59,8 @@
import whisk.connector.kafka.KafkaProducerConnector
import whisk.core.WhiskConfig
import whisk.core.WhiskConfig.consulServer
-import whisk.core.WhiskConfig.dockerRegistry
import whisk.core.WhiskConfig.dockerImagePrefix
+import whisk.core.WhiskConfig.dockerRegistry
import whisk.core.WhiskConfig.edgeHost
import whisk.core.WhiskConfig.kafkaHost
import whisk.core.WhiskConfig.logsDir
@@ -68,7 +68,10 @@
import whisk.core.WhiskConfig.whiskVersion
import whisk.core.connector.{ ActivationMessage => Message }
import whisk.core.connector.CompletionMessage
-import whisk.core.container._
+import whisk.core.container.ContainerPool
+import whisk.core.container.Interval
+import whisk.core.container.RunResult
+import whisk.core.container.WhiskContainer
import whisk.core.dispatcher.ActivationFeed.ActivationNotification
import whisk.core.dispatcher.ActivationFeed.ContainerReleased
import whisk.core.dispatcher.ActivationFeed.FailedActivation
@@ -77,6 +80,7 @@
import whisk.core.entity.ActivationId
import whisk.core.entity.ActivationLogs
import whisk.core.entity.ActivationResponse
+import whisk.core.entity.BlackBoxExec
import whisk.core.entity.DocId
import whisk.core.entity.DocInfo
import whisk.core.entity.DocRevision
@@ -217,7 +221,7 @@
}
/*
- * Create a whisk activation out of the errorMsg and finish the transaction.
+ * Creates a whisk activation out of the errorMsg and finish the transaction.
* Failing with an error can involve multiple futures but the effecting call is completeTransaction which is guarded.
*/
protected def completeTransactionWithError(actionDocInfo: DocInfo, tran: Transaction, errorMsg: String)(
@@ -226,9 +230,9 @@
val msg = tran.msg
val name = EntityName(actionDocInfo.id().split(Namespace.PATHSEP)(1))
val version = SemVer() // TODO: this is wrong, when the semver is passed from controller, fix this
- val payload = msg.content getOrElse JsObject()
- val response = Some(404, JsObject(ActivationResponse.ERROR_FIELD -> errorMsg.toJson).compactPrint)
- val activation = makeWhiskActivation(tran, false, msg, name, version, payload, response)
+ val response = ActivationResponse.whiskError(errorMsg)
+ val interval = computeActivationInterval(tran)
+ val activation = makeWhiskActivation(msg, name, version, response, interval)
completeTransaction(tran, activation, FailedActivation(transid))
}
@@ -260,10 +264,6 @@
}
}
- // These are related to initialization
- private val RegularSlack = 100.milliseconds
- private val BlackBoxSlack = 200.milliseconds
-
protected def invokeAction(action: WhiskAction, auth: WhiskAuth, payload: JsObject, tran: Transaction)(
implicit transid: TransactionId): Future[DocInfo] = {
val msg = tran.msg
@@ -278,12 +278,7 @@
action.fullyQualifiedName, msg.activationId.toString)) // cached
case Some(RunResult(interval, Some((200, _)))) => { // successful init
-
- // TODO: @perryibm update comment if this is still necessary else remove
- Thread.sleep((if (con.isBlackbox) BlackBoxSlack else RegularSlack).toMillis)
-
tran.initInterval = Some(interval)
-
(false, con.run(params, msg.meta, auth.compact, timeout,
action.fullyQualifiedName, msg.activationId.toString))
}
@@ -293,7 +288,9 @@
case (failedInit, RunResult(interval, response)) =>
if (!failedInit) tran.runInterval = Some(interval)
- val activationResult = makeWhiskActivation(tran, con.isBlackbox, msg, action, payload, response)
+ val activationInterval = computeActivationInterval(tran)
+ val activationResponse = getActivationResponse(activationInterval, action.limits.timeout.duration, response, con.isBlackbox)
+ val activationResult = makeWhiskActivation(msg, action.name, action.version, activationResponse, activationInterval)
val completeMsg = CompletionMessage(transid, activationResult)
producer.send("completed", completeMsg) map { status =>
@@ -314,10 +311,14 @@
case None => { // this corresponds to the container not even starting - not /init failing
info(this, s"failed to start or get a container")
- val response = Some(420, "Error starting container")
- val contents = JsArray(JsString("Error starting container"))
- val activation = makeWhiskActivation(tran, false, msg, action, payload, response)
- completeTransaction(tran, activation withLogs ActivationLogs.serdes.read(contents), FailedActivation(transid))
+ val response = if (action.exec.kind == BlackBoxExec) {
+ ActivationResponse.containerError("the container did to start")
+ } else {
+ ActivationResponse.whiskError("error starting container to run action")
+ }
+ val interval = computeActivationInterval(tran)
+ val activation = makeWhiskActivation(msg, action.name, action.version, response, interval)
+ completeTransaction(tran, activation, FailedActivation(transid))
}
}
}
@@ -457,10 +458,7 @@
ActivationResponse.containerError(s"the action 'result' value is not an object: ${notAnObj.toString}")
case Failure(t) =>
- if (isBlackbox)
- warn(this, s"response did not json parse: '$contents' led to $t")
- else
- error(this, s"response did not json parse: '$contents' led to $t")
+ (if (isBlackbox) warn _ else error _)(this, s"response did not json parse: '$contents' led to $t")
ActivationResponse.containerError("the action did not produce a valid JSON response")
}
} getOrElse ActivationResponse.whiskError("failed to obtain action invocation response")
@@ -491,33 +489,32 @@
}
// -------------------------------------------------------------------------------------------------------------
- private def makeWhiskActivation(transaction: Transaction, isBlackbox: Boolean, msg: Message, action: WhiskAction,
- payload: JsObject, response: Option[(Int, String)])(
- implicit transid: TransactionId): WhiskActivation = {
- makeWhiskActivation(transaction, isBlackbox, msg, action.name, action.version, payload, response, action.limits.timeout.duration)
- }
- private def makeWhiskActivation(transaction: Transaction, isBlackbox: Boolean, msg: Message, actionName: EntityName,
- actionVersion: SemVer, payload: JsObject, response: Option[(Int, String)], timeout: Duration = Duration.Inf)(
- implicit transid: TransactionId): WhiskActivation = {
-
- // We reconstruct a plausible interval based on the time spent in the various operations.
- // The goal is for the interval to have a duration corresponding to the sum of all durations
- // and an endtime corresponding to the latest endtime.
- val interval: Interval = (transaction.initInterval, transaction.runInterval) match {
- case (None, Some(run)) => run
- case (Some(init), None) => init
- case (None, None) => Interval(Instant.now(Clock.systemUTC()), Instant.now(Clock.systemUTC()))
- case (Some(init), Some(Interval(runStart, runEnd))) =>
- Interval(runStart.minusMillis(init.duration.toMillis), runEnd)
- }
-
- val activationResponse = if (interval.duration >= timeout) {
+ /**
+ * Interprets the responses from the container and maps it to an appropriate ActivationResponse.
+ */
+ private def getActivationResponse(
+ interval: Interval,
+ timeout: Duration,
+ response: Option[(Int, String)],
+ isBlackbox: Boolean)(
+ implicit transid: TransactionId): ActivationResponse = {
+ if (interval.duration >= timeout) {
ActivationResponse.applicationError(s"action exceeded its time limits of ${timeout.toMillis} milliseconds")
} else {
processResponseContent(isBlackbox, response)
}
+ }
+ /**
+ * Creates a WhiskActivation for the given action, response and duration.
+ */
+ private def makeWhiskActivation(
+ msg: Message,
+ actionName: EntityName,
+ actionVersion: SemVer,
+ activationResponse: ActivationResponse,
+ interval: Interval) = {
WhiskActivation(
namespace = msg.subject.namespace,
name = actionName,
@@ -532,6 +529,24 @@
logs = ActivationLogs())
}
+ /**
+ * Reconstructs an interval based on the time spent in the various operations.
+ * The goal is for the interval to have a duration corresponding to the sum of all durations
+ * and an endtime corresponding to the latest endtime.
+ *
+ * @param transaction the transaction object containing metadata
+ * @return interval for the transaction with start/end times computed
+ */
+ private def computeActivationInterval(transaction: Transaction): Interval = {
+ (transaction.initInterval, transaction.runInterval) match {
+ case (None, Some(run)) => run
+ case (Some(init), None) => init
+ case (None, None) => Interval(Instant.now(Clock.systemUTC()), Instant.now(Clock.systemUTC()))
+ case (Some(init), Some(Interval(runStart, runEnd))) =>
+ Interval(runStart.minusMillis(init.duration.toMillis), runEnd)
+ }
+ }
+
private val entityStore = WhiskEntityStore.datastore(config)
private val authStore = WhiskAuthStore.datastore(config)
private val activationStore = WhiskActivationStore.datastore(config)
diff --git a/settings.gradle b/settings.gradle
index 4550ce1..ed56929 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -19,6 +19,8 @@
include 'tools:go-cli'
include 'sdk:docker'
+include 'tests:dat:blackbox:badaction'
+include 'tests:dat:blackbox:badproxy'
include 'tests'
diff --git a/tests/build.gradle b/tests/build.gradle
index 0c0a1df..5029741 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -36,7 +36,9 @@
':core:javaAction:distDocker',
':core:swiftAction:distDocker',
':core:swift3Action:distDocker',
- ':sdk:docker:distDocker'
+ ':sdk:docker:distDocker',
+ 'tests:dat:blackbox:badaction',
+ 'tests:dat:blackbox:badproxy'
])
dependencies {
diff --git a/tests/src/actionContainers/DockerExampleContainerTests.scala b/tests/src/actionContainers/DockerExampleContainerTests.scala
index 54d93de..ddaaf53 100644
--- a/tests/src/actionContainers/DockerExampleContainerTests.scala
+++ b/tests/src/actionContainers/DockerExampleContainerTests.scala
@@ -16,15 +16,17 @@
package actionContainers
+import java.util.concurrent.TimeoutException
+
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import ActionContainer.withContainer
+import common.WskActorSystem
import spray.json.JsNumber
import spray.json.JsObject
import spray.json.JsString
-
-import common.WskActorSystem
+import spray.json.JsBoolean
@RunWith(classOf[JUnitRunner])
class DockerExampleContainerTests extends ActionProxyContainerTestUtils with WskActorSystem {
@@ -83,4 +85,64 @@
case (o, _) => o should include("This is an example log message from an arbitrary C program!")
}, 2)
}
+
+ behavior of "bad containers"
+
+ it should "timeout init with exception" in {
+ val (out, err) = withContainer("badaction") { c =>
+ a[TimeoutException] should be thrownBy {
+ val (code, out) = c.init(initPayload("sleep"))
+ println(code, out)
+ }
+ }
+
+ out should include("sleeping")
+ err shouldBe empty
+ }
+
+ it should "abort init with empty response" in {
+ val (out, err) = withContainer("badaction") { c =>
+ val (code, out) = c.init(initPayload("exit"))
+ code shouldBe 500
+ out shouldBe empty
+ }
+
+ out should include("exit")
+ err shouldBe empty
+ }
+
+ it should "timeout run with exception" in {
+ val (out, err) = withContainer("badaction") { c =>
+ a[TimeoutException] should be thrownBy {
+ val (code, out) = c.run(runPayload(JsObject("sleep" -> JsBoolean(true))))
+ println(code, out)
+ }
+ }
+
+ out should include("sleeping")
+ err shouldBe empty
+ }
+
+ it should "abort run with empty response" in {
+ val (out, err) = withContainer("badaction") { c =>
+ val (code, out) = c.run(runPayload(JsObject("exit" -> JsBoolean(true))))
+ code shouldBe 500
+ out shouldBe empty
+ }
+
+ out should include("exit")
+ err shouldBe empty
+ }
+
+ it should "timeout bad proxy with exception" in {
+ val (out, err) = withContainer("badproxy") { c =>
+ a[TimeoutException] should be thrownBy {
+ val (code, out) = c.init(JsObject())
+ println(code, out)
+ }
+ }
+
+ out shouldBe empty
+ err shouldBe empty
+ }
}
diff --git a/tests/src/services/KafkaConnectorTests.scala b/tests/src/services/KafkaConnectorTests.scala
index 73de4f9..1e127fe 100644
--- a/tests/src/services/KafkaConnectorTests.scala
+++ b/tests/src/services/KafkaConnectorTests.scala
@@ -102,7 +102,7 @@
if (i < 2) {
Thread.sleep((sessionTimeout + 1.second).toMillis)
- an[CommitFailedException] should be thrownBy {
+ a[CommitFailedException] should be thrownBy {
consumer.commit() // sleep should cause commit to fail
}
} else consumer.commit()
diff --git a/tests/src/whisk/core/cli/test/WskBasicUsageTests.scala b/tests/src/whisk/core/cli/test/WskBasicUsageTests.scala
index 4ee0391..96a2193 100644
--- a/tests/src/whisk/core/cli/test/WskBasicUsageTests.scala
+++ b/tests/src/whisk/core/cli/test/WskBasicUsageTests.scala
@@ -41,8 +41,7 @@
import common.Wsk
import common.WskProps
import common.WskTestHelpers
-import spray.json.DefaultJsonProtocol.IntJsonFormat
-import spray.json.DefaultJsonProtocol.LongJsonFormat
+import spray.json.DefaultJsonProtocol._
import spray.json._
import spray.json.JsObject
import spray.json.pimpAny
@@ -343,6 +342,21 @@
wsk.parseJsonString(stdout).fields("annotations") shouldBe getEscapedJSONTestArgOutput
}
+ it should "invoke an action that exits and get appropriate error" in withAssetCleaner(wskprops) {
+ (wp, assetHelper) =>
+ val name = "abort"
+ assetHelper.withCleaner(wsk.action, name) {
+ (action, _) => action.create(name, Some(TestUtils.getTestActionFilename("exit.py")))
+ }
+
+ withActivation(wsk.activation, wsk.action.invoke(name)) {
+ activation =>
+ val response = activation.fields("response").asJsObject
+ response.fields("result") shouldBe JsObject("error" -> "the action did not produce a valid JSON response".toJson)
+ response.fields("status") shouldBe "action developer error".toJson
+ }
+ }
+
behavior of "Wsk packages"
it should "reject create of a package without a package name" in {
diff --git a/tests/src/whisk/core/controller/test/AuthorizeTests.scala b/tests/src/whisk/core/controller/test/AuthorizeTests.scala
index 0f73891..ff8e567 100644
--- a/tests/src/whisk/core/controller/test/AuthorizeTests.scala
+++ b/tests/src/whisk/core/controller/test/AuthorizeTests.scala
@@ -127,7 +127,7 @@
val collections = Seq(PACKAGES)
val resources = collections map { Resource(someUser.namespace, _, Some("xyz")) }
resources foreach { r =>
- an[RejectRequest] should be thrownBy {
+ a[RejectRequest] should be thrownBy {
// read should fail because the lookup for the package will fail
Await.result(entitlementService.check(someUser, READ, r), requestTimeout)
}