enable use of concurrency action limits via cli (#383)
* enable --concurrency flag to set intra-container concurrency limit via wsk cli
* formatting
* update incubator-openwhisk-client-go git version to latest
* missing ConcurrencyLimit
* formatting
* update client-go sha
* test env needs to override default max concurrency-limit in order to test concurrency > 1
* missing license
* missing EOL
* add concurrency limit to limits test assertions
diff --git a/build.gradle b/build.gradle
index 945f879..eca828e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -57,7 +57,7 @@
build(['name':'golang.org/x/sys/unix', 'version':'7f918dd405547ecb864d14a8ecbbfe205b5f930f', 'transitive':false])
build(['name':'gopkg.in/yaml.v2', 'version':'cd8b52f8269e0feb286dfeef29f8fe4d5b397e0b', 'transitive':false])
build(['name':'github.com/ghodss/yaml', 'version':'0ca9ea5df5451ffdf184b4428c902747c2c11cd7', 'transitive':false])
- build(['name':'github.com/apache/incubator-openwhisk-client-go/whisk','version':'d7cee96e83a1f38413a1f5286bd524dac72686c9','transitive':false])
+ build(['name':'github.com/apache/incubator-openwhisk-client-go/whisk','version':'c7013c9f456f35be34b3bf166535aaa03b834331','transitive':false])
// END - Imported from Godeps
test name:'github.com/stretchr/testify', version:'b91bfb9ebec76498946beb6af7c0230c7cc7ba6c', transitive:false //, tag: 'v1.2.0'
test name:'github.com/spf13/viper', version:'aafc9e6bc7b7bb53ddaa75a5ef49a17d6e654be5', transitive:false
diff --git a/commands/action.go b/commands/action.go
index 4d0ad4e..e6429bc 100644
--- a/commands/action.go
+++ b/commands/action.go
@@ -42,6 +42,7 @@
MEMORY_LIMIT = 256
TIMEOUT_LIMIT = 60000
LOGSIZE_LIMIT = 10
+ CONCURRENCY_LIMIT = 1
ACTIVATION_ID = "activationId"
WEB_EXPORT_ANNOT = "web-export"
RAW_HTTP_ANNOT = "raw-http"
@@ -413,9 +414,11 @@
cmd.LocalFlags().Changed(MEMORY_FLAG),
cmd.LocalFlags().Changed(LOG_SIZE_FLAG),
cmd.LocalFlags().Changed(TIMEOUT_FLAG),
+ cmd.LocalFlags().Changed(CONCURRENCY_FLAG),
Flags.action.memory,
Flags.action.logsize,
- Flags.action.timeout)
+ Flags.action.timeout,
+ Flags.action.concurrency)
paramArgs = Flags.common.param
annotArgs = Flags.common.annotation
@@ -886,10 +889,10 @@
}
}
-func getLimits(memorySet bool, logSizeSet bool, timeoutSet bool, memory int, logSize int, timeout int) *whisk.Limits {
+func getLimits(memorySet bool, logSizeSet bool, timeoutSet bool, concurrencySet bool, memory int, logSize int, timeout int, concurrency int) *whisk.Limits {
var limits *whisk.Limits
- if memorySet || logSizeSet || timeoutSet {
+ if memorySet || logSizeSet || timeoutSet || concurrencySet {
limits = new(whisk.Limits)
if memorySet {
@@ -903,6 +906,10 @@
if timeoutSet {
limits.Timeout = &timeout
}
+
+ if concurrencySet {
+ limits.Concurrency = &concurrency
+ }
}
return limits
@@ -1268,6 +1275,7 @@
actionCreateCmd.Flags().IntVarP(&Flags.action.timeout, TIMEOUT_FLAG, "t", TIMEOUT_LIMIT, wski18n.T("the timeout `LIMIT` in milliseconds after which the action is terminated"))
actionCreateCmd.Flags().IntVarP(&Flags.action.memory, MEMORY_FLAG, "m", MEMORY_LIMIT, wski18n.T("the maximum memory `LIMIT` in MB for the action"))
actionCreateCmd.Flags().IntVarP(&Flags.action.logsize, LOG_SIZE_FLAG, "l", LOGSIZE_LIMIT, wski18n.T("the maximum log size `LIMIT` in MB for the action"))
+ actionCreateCmd.Flags().IntVarP(&Flags.action.concurrency, CONCURRENCY_FLAG, "c", CONCURRENCY_LIMIT, wski18n.T("the maximum intra-container concurrent activation `LIMIT` for the action"))
actionCreateCmd.Flags().StringSliceVarP(&Flags.common.annotation, "annotation", "a", nil, wski18n.T("annotation values in `KEY VALUE` format"))
actionCreateCmd.Flags().StringVarP(&Flags.common.annotFile, "annotation-file", "A", "", wski18n.T("`FILE` containing annotation values in JSON format"))
actionCreateCmd.Flags().StringSliceVarP(&Flags.common.param, "param", "p", nil, wski18n.T("parameter values in `KEY VALUE` format"))
@@ -1284,6 +1292,7 @@
actionUpdateCmd.Flags().IntVarP(&Flags.action.timeout, TIMEOUT_FLAG, "t", TIMEOUT_LIMIT, wski18n.T("the timeout `LIMIT` in milliseconds after which the action is terminated"))
actionUpdateCmd.Flags().IntVarP(&Flags.action.memory, MEMORY_FLAG, "m", MEMORY_LIMIT, wski18n.T("the maximum memory `LIMIT` in MB for the action"))
actionUpdateCmd.Flags().IntVarP(&Flags.action.logsize, LOG_SIZE_FLAG, "l", LOGSIZE_LIMIT, wski18n.T("the maximum log size `LIMIT` in MB for the action"))
+ actionUpdateCmd.Flags().IntVarP(&Flags.action.concurrency, CONCURRENCY_FLAG, "c", CONCURRENCY_LIMIT, wski18n.T("the maximum intra-container concurrent activation `LIMIT` for the action"))
actionUpdateCmd.Flags().StringSliceVarP(&Flags.common.annotation, "annotation", "a", []string{}, wski18n.T("annotation values in `KEY VALUE` format"))
actionUpdateCmd.Flags().StringVarP(&Flags.common.annotFile, "annotation-file", "A", "", wski18n.T("`FILE` containing annotation values in JSON format"))
actionUpdateCmd.Flags().StringSliceVarP(&Flags.common.param, "param", "p", []string{}, wski18n.T("parameter values in `KEY VALUE` format"))
diff --git a/commands/flags.go b/commands/flags.go
index 223ccfe..d9761d2 100644
--- a/commands/flags.go
+++ b/commands/flags.go
@@ -26,13 +26,14 @@
///////////
const (
- MEMORY_FLAG = "memory"
- LOG_SIZE_FLAG = "logsize"
- TIMEOUT_FLAG = "timeout"
- WEB_FLAG = "web"
- WEB_SECURE_FLAG = "web-secure"
- SAVE_FLAG = "save"
- SAVE_AS_FLAG = "save-as"
+ MEMORY_FLAG = "memory"
+ LOG_SIZE_FLAG = "logsize"
+ CONCURRENCY_FLAG = "concurrency"
+ TIMEOUT_FLAG = "timeout"
+ WEB_FLAG = "web"
+ WEB_SECURE_FLAG = "web-secure"
+ SAVE_FLAG = "save"
+ SAVE_AS_FLAG = "save-as"
)
var cliDebug = os.Getenv("WSK_CLI_DEBUG") // Useful for tracing init() code
@@ -130,21 +131,22 @@
}
type ActionFlags struct {
- docker string
- native bool
- copy bool
- web string
- websecure string
- sequence bool
- timeout int
- memory int
- logsize int
- result bool
- kind string
- main string
- url bool
- save bool
- saveAs string
+ docker string
+ native bool
+ copy bool
+ web string
+ websecure string
+ sequence bool
+ timeout int
+ memory int
+ logsize int
+ concurrency int
+ result bool
+ kind string
+ main string
+ url bool
+ save bool
+ saveAs string
}
func IsVerbose() bool {
diff --git a/tests/src/test/resources/application.conf b/tests/src/test/resources/application.conf
new file mode 100644
index 0000000..0abcab4
--- /dev/null
+++ b/tests/src/test/resources/application.conf
@@ -0,0 +1,11 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more contributor
+# license agreements; and to You under the Apache License, Version 2.0.
+
+#test-only overrides so that tests can override defaults in application.conf (todo: move all defaults to reference.conf)
+test {
+ whisk {
+ concurrency-limit {
+ max = 500
+ }
+ }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/cli/test/WskCliBasicUsageTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/cli/test/WskCliBasicUsageTests.scala
index 5e44197..c7681eb 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/cli/test/WskCliBasicUsageTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/cli/test/WskCliBasicUsageTests.scala
@@ -39,6 +39,7 @@
import spray.json.DefaultJsonProtocol._
import spray.json._
import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.ConcurrencyLimit._
import org.apache.openwhisk.core.entity.LogLimit._
import org.apache.openwhisk.core.entity.MemoryLimit._
import org.apache.openwhisk.core.entity.TimeLimit._
@@ -390,6 +391,7 @@
val memoryLimit = 512 MB
val logLimit = 1 MB
val timeLimit = 60 seconds
+ val concurrencyLimit = 500
assetHelper.withCleaner(wsk.action, name) { (action, _) =>
action.create(
@@ -397,7 +399,8 @@
Some(TestUtils.getTestActionFilename("helloAsync.js")),
memory = Some(memoryLimit),
timeout = Some(timeLimit),
- logsize = Some(logLimit))
+ logsize = Some(logLimit),
+ concurrency = Some(concurrencyLimit))
}
val run = wsk.action.invoke(name, Map("payload" -> "this is a test".toJson))
@@ -408,7 +411,11 @@
val limitsObj =
JsObject(
"key" -> JsString("limits"),
- "value" -> ActionLimits(TimeLimit(timeLimit), MemoryLimit(memoryLimit), LogLimit(logLimit)).toJson)
+ "value" -> ActionLimits(
+ TimeLimit(timeLimit),
+ MemoryLimit(memoryLimit),
+ LogLimit(logLimit),
+ ConcurrencyLimit(concurrencyLimit)).toJson)
val path = annotations.find {
_.fields("key").convertTo[String] == "path"
@@ -2041,12 +2048,14 @@
def testLimit(timeout: Option[Duration] = None,
memory: Option[ByteSize] = None,
logs: Option[ByteSize] = None,
+ concurrency: Option[Int] = None,
ec: Int = SUCCESS_EXIT) = {
// Limits to assert, standard values if CLI omits certain values
val limits = JsObject(
"timeout" -> timeout.getOrElse(STD_DURATION).toMillis.toJson,
"memory" -> memory.getOrElse(stdMemory).toMB.toInt.toJson,
- "logs" -> logs.getOrElse(stdLogSize).toMB.toInt.toJson)
+ "logs" -> logs.getOrElse(stdLogSize).toMB.toInt.toJson,
+ "concurrency" -> concurrency.getOrElse(stdConcurrent).toJson)
val name = "ActionLimitTests" + Instant.now.toEpochMilli
val createResult =
@@ -2057,8 +2066,10 @@
logsize = logs,
memory = memory,
timeout = timeout,
+ concurrency = concurrency,
expectedExitCode = DONTCARE_EXIT)
- withClue(s"create failed for parameters: timeout = $timeout, memory = $memory, logsize = $logs:") {
+ withClue(
+ s"create failed for parameters: timeout = $timeout, memory = $memory, logsize = $logs, concurrency = $concurrency:") {
result.exitCode should be(ec)
}
result
@@ -2084,13 +2095,15 @@
time <- Seq(None, Some(MIN_DURATION), Some(MAX_DURATION))
mem <- Seq(None, Some(minMemory), Some(maxMemory))
log <- Seq(None, Some(minLogSize), Some(maxLogSize))
- } testLimit(time, mem, log)
+ concurrency <- Seq(None, Some(minConcurrent), Some(maxConcurrent))
+ } testLimit(time, mem, log, concurrency)
// Assert that invalid permutation are rejected
- testLimit(Some(0.milliseconds), None, None, BAD_REQUEST)
- testLimit(Some(100.minutes), None, None, BAD_REQUEST)
- testLimit(None, Some(0.MB), None, BAD_REQUEST)
- testLimit(None, Some(32768.MB), None, BAD_REQUEST)
- testLimit(None, None, Some(32768.MB), BAD_REQUEST)
+ testLimit(Some(0.milliseconds), None, None, None, BAD_REQUEST)
+ testLimit(Some(100.minutes), None, None, None, BAD_REQUEST)
+ testLimit(None, Some(0.MB), None, None, BAD_REQUEST)
+ testLimit(None, Some(32768.MB), None, None, BAD_REQUEST)
+ testLimit(None, None, Some(32768.MB), None, BAD_REQUEST)
+ testLimit(None, None, None, Some(5000), BAD_REQUEST)
}
}
diff --git a/tests/src/test/scala/system/basic/WskCliBasicTests.scala b/tests/src/test/scala/system/basic/WskCliBasicTests.scala
index ba98949..2acbfaf 100644
--- a/tests/src/test/scala/system/basic/WskCliBasicTests.scala
+++ b/tests/src/test/scala/system/basic/WskCliBasicTests.scala
@@ -313,8 +313,8 @@
.get(name, fieldFilter = Some("annotations"))
.stdout should include regex (s"""$successMsg annotations\n\\[\\s+\\{\\s+"key":\\s+"exec",\\s+"value":\\s+"nodejs:6"\\s+\\}\\s+\\]""")
wsk.action
- .get(name, fieldFilter = Some("limits"))
- .stdout should include regex (s"""$successMsg limits\n\\{\\s+"timeout":\\s+60000,\\s+"memory":\\s+256,\\s+"logs":\\s+10\\s+\\}""")
+ .get(name, fieldFilter = Some("limits")) //
+ .stdout should include regex (s"""$successMsg limits\n\\{\\s+"timeout":\\s+60000,\\s+"memory":\\s+256,\\s+"logs":\\s+10,\\s+"concurrency":\\s+1\\s+\\}""")
wsk.action
.get(name, fieldFilter = Some("namespace"))
.stdout should include regex (s"""(?i)$successMsg namespace\n"$ns"""")
diff --git a/wski18n/resources/en_US.all.json b/wski18n/resources/en_US.all.json
index 3e7024e..a837d5e 100644
--- a/wski18n/resources/en_US.all.json
+++ b/wski18n/resources/en_US.all.json
@@ -928,6 +928,10 @@
"translation": "the maximum memory `LIMIT` in MB for the action"
},
{
+ "id": "the maximum intra-container concurrent activation `LIMIT` for the action",
+ "translation": "the maximum intra-container concurrent activation `LIMIT` for the action"
+ },
+ {
"id": "the maximum log size `LIMIT` in MB for the action",
"translation": "the maximum log size `LIMIT` in MB for the action"
},