[New Scheduler] Add duration checker (#4984)
* Add a duration checker for Elasticsearch.
* Add configurations for the ElasticSearchDurationCheckerTests class
* Use a private helper function to execute queries.
* Add an Ansible variable for the duration checker.
* Apply scalaFmt
* Include test cases for duration checker to system tests.
* Setup ElasticSearch for system tests.
* Increase patience config to wait for response longer.
* Add postfixOps
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 342ad1d..564a021 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -421,3 +421,6 @@
port: "{{ metrics_kamon_statsd_port | default('8125') }}"
user_events: "{{ user_events_enabled | default(false) | lower }}"
+
+durationChecker:
+ timeWindow: "{{ duration_checker_time_window | default('1 d') }}"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 7c0ec2b..84e75e8 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -272,6 +272,8 @@
val metrics = "whisk.metrics"
val featureFlags = "whisk.feature-flags"
+ val durationChecker = s"whisk.duration-checker"
+
val whiskConfig = "whisk.config"
val sharedPackageExecuteOnly = s"whisk.shared-packages-execute-only"
val swaggerUi = "whisk.swagger-ui"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
index 5d110a7..8f25712 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
@@ -57,27 +57,17 @@
class ElasticSearchActivationStore(
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
- elasticSearchConfig: ElasticSearchActivationStoreConfig =
- loadConfigOrThrow[ElasticSearchActivationStoreConfig](ConfigKeys.elasticSearchActivationStore),
+ elasticSearchConfig: ElasticSearchActivationStoreConfig,
useBatching: Boolean = false)(implicit actorSystem: ActorSystem,
actorMaterializer: ActorMaterializer,
logging: Logging)
extends ActivationStore {
import com.sksamuel.elastic4s.http.ElasticDsl._
+ import ElasticSearchActivationStore.{generateIndex, httpClientCallback}
private implicit val executionContextExecutor: ExecutionContextExecutor = actorSystem.dispatcher
- private val httpClientCallback = new HttpClientConfigCallback {
- override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
- val provider = new BasicCredentialsProvider
- provider.setCredentials(
- AuthScope.ANY,
- new UsernamePasswordCredentials(elasticSearchConfig.username, elasticSearchConfig.password))
- httpClientBuilder.setDefaultCredentialsProvider(provider)
- }
- }
-
private val client =
ElasticClient(
ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
@@ -407,10 +397,6 @@
activationId.toString.split("/")(0)
}
- private def generateIndex(namespace: String): String = {
- elasticSearchConfig.indexPattern.dropWhile(_ == '/') format namespace.toLowerCase
- }
-
private def generateRangeQuery(key: String, since: Option[Instant], upto: Option[Instant]): RangeQuery = {
rangeQuery(key)
.gte(since.map(_.toEpochMilli).getOrElse(minStart))
@@ -418,7 +404,31 @@
}
}
+object ElasticSearchActivationStore {
+ val elasticSearchConfig: ElasticSearchActivationStoreConfig =
+ loadConfigOrThrow[ElasticSearchActivationStoreConfig](ConfigKeys.elasticSearchActivationStore)
+
+ val httpClientCallback = new HttpClientConfigCallback {
+ override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
+ val provider = new BasicCredentialsProvider
+ provider.setCredentials(
+ AuthScope.ANY,
+ new UsernamePasswordCredentials(elasticSearchConfig.username, elasticSearchConfig.password))
+ httpClientBuilder.setDefaultCredentialsProvider(provider)
+ }
+ }
+
+ def generateIndex(namespace: String): String = {
+ elasticSearchConfig.indexPattern.dropWhile(_ == '/') format namespace.toLowerCase
+ }
+}
+
object ElasticSearchActivationStoreProvider extends ActivationStoreProvider {
+ import ElasticSearchActivationStore.elasticSearchConfig
+
override def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging) =
- new ElasticSearchActivationStore(useBatching = true)(actorSystem, actorMaterializer, logging)
+ new ElasticSearchActivationStore(elasticSearchConfig = elasticSearchConfig, useBatching = true)(
+ actorSystem,
+ actorMaterializer,
+ logging)
}
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/ElasticSearchDurationChecker.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/ElasticSearchDurationChecker.scala
new file mode 100644
index 0000000..88e1f2f
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/ElasticSearchDurationChecker.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openwhisk.core.scheduler.queue
+
+import akka.actor.ActorSystem
+import com.sksamuel.elastic4s.http.ElasticDsl._
+import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties, NoOpRequestConfigCallback}
+import com.sksamuel.elastic4s.searches.queries.Query
+import com.sksamuel.elastic4s.{ElasticDate, ElasticDateMath, Seconds}
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.entity.WhiskActionMetaData
+import org.apache.openwhisk.spi.Spi
+import pureconfig.loadConfigOrThrow
+import spray.json.{JsArray, JsNumber, JsValue, RootJsonFormat, deserializationError, _}
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+import scala.language.implicitConversions
+import scala.util.{Failure, Try}
+import pureconfig.generic.auto._
+
+trait DurationChecker {
+ def checkAverageDuration(invocationNamespace: String, actionMetaData: WhiskActionMetaData)(
+ callback: DurationCheckResult => DurationCheckResult): Future[DurationCheckResult]
+}
+
+case class DurationCheckResult(averageDuration: Option[Double], hitCount: Long, took: Long)
+
+object ElasticSearchDurationChecker {
+ val FilterAggregationName = "filterAggregation"
+ val AverageAggregationName = "averageAggregation"
+
+ implicit val serde = new ElasticSearchDurationCheckResultFormat()
+
+ def getFromDate(timeWindow: FiniteDuration): ElasticDateMath =
+ ElasticDate.now minus (timeWindow.toSeconds.toInt, Seconds)
+}
+
+class ElasticSearchDurationChecker(private val client: ElasticClient, val timeWindow: FiniteDuration)(
+ implicit val actorSystem: ActorSystem,
+ implicit val logging: Logging)
+ extends DurationChecker {
+ import ElasticSearchDurationChecker._
+ import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStore.generateIndex
+
+ implicit val ec = actorSystem.getDispatcher
+
+ override def checkAverageDuration(invocationNamespace: String, actionMetaData: WhiskActionMetaData)(
+ callback: DurationCheckResult => DurationCheckResult): Future[DurationCheckResult] = {
+ val index = generateIndex(invocationNamespace)
+ val fqn = actionMetaData.fullyQualifiedName(false)
+ val fromDate = getFromDate(timeWindow)
+
+ logging.info(this, s"check average duration for $fqn in $index for last $timeWindow")
+
+ actionMetaData.binding match {
+ case Some(binding) =>
+ val boolQueryResult = List(
+ matchQuery("annotations.binding", s"$binding"),
+ matchQuery("name", actionMetaData.name),
+ rangeQuery("@timestamp").gte(fromDate))
+
+ executeQuery(boolQueryResult, callback, index)
+
+ case None =>
+ val queryResult = List(matchQuery("path.keyword", fqn.toString), rangeQuery("@timestamp").gte(fromDate))
+
+ executeQuery(queryResult, callback, index)
+ }
+ }
+
+ private def executeQuery(boolQueryResult: List[Query],
+ callback: DurationCheckResult => DurationCheckResult,
+ index: String) = {
+ client
+ .execute {
+ (search(index) query {
+ boolQuery must {
+ boolQueryResult
+ }
+ } aggregations
+ avgAgg(AverageAggregationName, "duration")).size(0)
+ }
+ .map { res =>
+ logging.debug(this, s"ElasticSearch query results: $res")
+ Try(serde.read(res.body.getOrElse("").parseJson))
+ }
+ .flatMap(Future.fromTry)
+ .map(callback(_))
+ .andThen {
+ case Failure(t) =>
+ logging.error(this, s"failed to check the average duration: ${t}")
+ }
+ }
+}
+
+object ElasticSearchDurationCheckerProvider extends DurationCheckerProvider {
+ import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStore._
+
+ override def instance(actorSystem: ActorSystem, log: Logging): ElasticSearchDurationChecker = {
+ implicit val as: ActorSystem = actorSystem
+ implicit val logging: Logging = log
+
+ val elasticClient =
+ ElasticClient(
+ ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
+ NoOpRequestConfigCallback,
+ httpClientCallback)
+
+ new ElasticSearchDurationChecker(elasticClient, durationCheckerConfig.timeWindow)
+ }
+}
+
+trait DurationCheckerProvider extends Spi {
+
+ val durationCheckerConfig: DurationCheckerConfig =
+ loadConfigOrThrow[DurationCheckerConfig](ConfigKeys.durationChecker)
+
+ def instance(actorSystem: ActorSystem, logging: Logging): DurationChecker
+}
+
+class ElasticSearchDurationCheckResultFormat extends RootJsonFormat[DurationCheckResult] {
+ import ElasticSearchDurationChecker._
+ import spray.json.DefaultJsonProtocol._
+
+ /**
+ * Expected sample data
+ {
+ "_shards": {
+ "failed": 0,
+ "skipped": 0,
+ "successful": 5,
+ "total": 5
+ },
+ "aggregations": {
+ "agg": {
+ "value": 14
+ }
+ },
+ "hits": {
+ "hits": [],
+ "max_score": 0,
+ "total": 3
+ },
+ "timed_out": false,
+ "took": 0
+ }
+ */
+ /**
+ * Expected sample data
+ {
+ "_shards": {
+ "failed": 0,
+ "skipped": 0,
+ "successful": 5,
+ "total": 5
+ },
+ "aggregations": {
+ "pathAggregation": {
+ "avg_duration": {
+ "value": 13
+ },
+ "doc_count": 3
+ }
+ },
+ "hits": {
+ "hits": [],
+ "max_score": 0,
+ "total": 6
+ },
+ "timed_out": false,
+ "took": 0
+ }
+ */
+ implicit def read(json: JsValue) = {
+ val jsObject = json.asJsObject
+
+ jsObject.getFields("aggregations", "took", "hits") match {
+ case Seq(aggregations, took, hits) =>
+ val hitCount = hits.asJsObject.getFields("total").headOption
+ val filterAggregations = aggregations.asJsObject.getFields(FilterAggregationName)
+ val averageAggregations = aggregations.asJsObject.getFields(AverageAggregationName)
+
+ (filterAggregations, averageAggregations, hitCount) match {
+ case (filterAggregations, _, Some(count)) if filterAggregations.nonEmpty =>
+ val averageDuration =
+ filterAggregations.headOption.flatMap(
+ _.asJsObject
+ .getFields(AverageAggregationName)
+ .headOption
+ .flatMap(_.asJsObject.getFields("value").headOption))
+
+ averageDuration match {
+ case Some(JsNull) =>
+ DurationCheckResult(None, count.convertTo[Long], took.convertTo[Long])
+
+ case Some(duration) =>
+ DurationCheckResult(Some(duration.convertTo[Double]), count.convertTo[Long], took.convertTo[Long])
+
+ case _ => deserializationError("Cannot deserialize ProductItem: invalid input. Raw input: ")
+ }
+
+ case (_, averageAggregations, Some(count)) if averageAggregations.nonEmpty =>
+ val averageDuration = averageAggregations.headOption.flatMap(_.asJsObject.getFields("value").headOption)
+
+ averageDuration match {
+ case Some(JsNull) =>
+ DurationCheckResult(None, count.convertTo[Long], took.convertTo[Long])
+
+ case Some(duration) =>
+ DurationCheckResult(Some(duration.convertTo[Double]), count.convertTo[Long], took.convertTo[Long])
+
+ case t => deserializationError(s"Cannot deserialize DurationCheckResult: invalid input. Raw input: $t")
+ }
+
+ case t => deserializationError(s"Cannot deserialize DurationCheckResult: invalid input. Raw input: $t")
+ }
+
+ case other => deserializationError(s"Cannot deserialize DurationCheckResult: invalid input. Raw input: $other")
+ }
+
+ }
+
+ // This method would not be used.
+ override def write(obj: DurationCheckResult): JsValue = {
+ JsArray(JsNumber(obj.averageDuration.get), JsNumber(obj.hitCount), JsNumber(obj.took))
+ }
+}
+
+case class DurationCheckerConfig(timeWindow: FiniteDuration)
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/NoopDurationChecker.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/NoopDurationChecker.scala
new file mode 100644
index 0000000..441bfed
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/NoopDurationChecker.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.queue
+
+import akka.actor.ActorSystem
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.entity.WhiskActionMetaData
+
+import scala.concurrent.Future
+
+object NoopDurationCheckerProvider extends DurationCheckerProvider {
+ override def instance(actorSystem: ActorSystem, log: Logging): NoopDurationChecker = {
+ implicit val as: ActorSystem = actorSystem
+ implicit val logging: Logging = log
+ new NoopDurationChecker()
+ }
+}
+
+object NoopDurationChecker {
+ implicit val serde = new ElasticSearchDurationCheckResultFormat()
+}
+
+class NoopDurationChecker extends DurationChecker {
+ import scala.concurrent.ExecutionContext.Implicits.global
+
+ override def checkAverageDuration(invocationNamespace: String, actionMetaData: WhiskActionMetaData)(
+ callback: DurationCheckResult => DurationCheckResult): Future[DurationCheckResult] = {
+ Future {
+ DurationCheckResult(Option.apply(0), 0, 0)
+ }
+ }
+}
diff --git a/tests/build.gradle b/tests/build.gradle
index f2fdd2a..57c3bd7 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -40,6 +40,7 @@
def projectsWithCoverage = [
':common:scala',
':core:controller',
+ ':core:scheduler',
':core:invoker',
':tools:admin',
':core:cosmosdb:cache-invalidator'
@@ -52,6 +53,7 @@
"org/apache/openwhisk/core/apigw/actions/test/**",
"org/apache/openwhisk/core/database/test/*CacheConcurrencyTests*",
"org/apache/openwhisk/core/controller/test/*ControllerApiTests*",
+ "org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheck*",
"apigw/healthtests/**",
"ha/**",
"services/**",
@@ -70,6 +72,7 @@
"org/apache/openwhisk/standalone/**",
"org/apache/openwhisk/core/cli/test/**",
"org/apache/openwhisk/core/limits/**",
+ "org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheck*",
"**/*CacheConcurrencyTests*",
"**/*ControllerApiTests*",
"org/apache/openwhisk/testEntities/**",
@@ -221,6 +224,7 @@
compile project(':common:scala')
compile project(':core:controller')
+ compile project(':core:scheduler')
compile project(':core:invoker')
compile project(':core:cosmosdb:cache-invalidator')
compile project(':core:monitoring:user-events')
diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2
index 53d1cc9..90802bc 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -97,13 +97,27 @@
parameter-storage {
current = "off"
}
-
+
elasticsearch {
docker-image = "{{ elasticsearch.docker_image | default('docker.elastic.co/elasticsearch/elasticsearch:' ~ elasticsearch.version ) }}"
}
helm.release = "release"
runtime.delete.timeout = "30 seconds"
+
+ duration-checker {
+ time-window = "{{ durationChecker.timeWindow }}"
+ }
+
+ activation-store {
+ elasticsearch {
+ protocol = "{{ db.elasticsearch.protocol }}"
+ hosts = "{{ elasticsearch_connect_string }}"
+ index-pattern = "{{ db.elasticsearch.index_pattern }}"
+ username = "{{ db.elasticsearch.auth.admin.username }}"
+ password = "{{ db.elasticsearch.auth.admin.password }}"
+ }
+ }
}
#test-only overrides so that tests can override defaults in application.conf (todo: move all defaults to reference.conf)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckResultFormatTest.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckResultFormatTest.scala
new file mode 100644
index 0000000..fe28e73
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckResultFormatTest.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.queue.test
+
+import org.apache.openwhisk.core.scheduler.queue.{DurationCheckResult, ElasticSearchDurationCheckResultFormat}
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+import spray.json._
+
+@RunWith(classOf[JUnitRunner])
+class ElasticSearchDurationCheckResultFormatTest extends FlatSpec with Matchers with ScalaFutures {
+ behavior of "ElasticSearchDurationCheckResultFormatTest"
+
+ val serde = new ElasticSearchDurationCheckResultFormat()
+
+ it should "serialize the data correctly" in {
+ val normalData = """{
+ "_shards": {
+ "failed": 0,
+ "skipped": 0,
+ "successful": 5,
+ "total": 5
+ },
+ "aggregations": {
+ "filterAggregation": {
+ "averageAggregation": {
+ "value": 14
+ },
+ "doc_count": 3
+ }
+ },
+ "hits": {
+ "hits": [],
+ "max_score": 0,
+ "total": 3
+ },
+ "timed_out": false,
+ "took": 2
+ }"""
+
+ val bindingData = """{
+ "_shards": {
+ "failed": 0,
+ "skipped": 0,
+ "successful": 5,
+ "total": 5
+ },
+ "aggregations": {
+ "averageAggregation": {
+ "value": 12
+ }
+ },
+ "hits": {
+ "hits": [],
+ "max_score": 0,
+ "total": 2
+ },
+ "timed_out": false,
+ "took": 0
+ }"""
+
+ val expected1 = DurationCheckResult(Some(14), 3, 2)
+ val expected2 = DurationCheckResult(Some(12), 2, 0)
+ val result1 = serde.read(normalData.parseJson)
+ val result2 = serde.read(bindingData.parseJson)
+
+ result1 shouldBe expected1
+ result2 shouldBe expected2
+ }
+
+ // Since the write method is not being used, this test is meaningless but added just for the duality.
+ it should "deserialize the data correctly" in {
+ val data = DurationCheckResult(Some(14), 3, 2)
+ val expected =
+ """[14, 3, 2]
+ |
+ |""".stripMargin
+ val result = serde.write(data)
+
+ result shouldBe expected.parseJson
+ }
+
+ it should "throw an exception when data is not in the expected format" in {
+ val malformedData = """{
+ "_shards": {
+ "failed": 0,
+ "skipped": 0,
+ "successful": 5,
+ "total": 5
+ },
+ "averageAggregation": {
+ "value": 14
+ },
+ "hits": {
+ "hits": [],
+ "max_score": 0,
+ "total": 3
+ },
+ "timed_out": false,
+ "took": 2
+ }"""
+
+ assertThrows[DeserializationException] {
+ serde.read(malformedData.parseJson)
+ }
+ }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckerTests.scala
new file mode 100644
index 0000000..867c0cd
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckerTests.scala
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.queue.test
+
+import akka.stream.ActorMaterializer
+import com.sksamuel.elastic4s.http.ElasticDsl._
+import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties, NoOpRequestConfigCallback}
+import common._
+import common.rest.WskRestOperations
+import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
+import org.apache.http.impl.client.BasicCredentialsProvider
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStoreConfig
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.test.ExecHelpers
+import org.apache.openwhisk.core.scheduler.queue.{DurationCheckResult, ElasticSearchDurationChecker}
+import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FlatSpec, Matchers}
+import org.scalatestplus.junit.JUnitRunner
+import pureconfig.generic.auto._
+import pureconfig.loadConfigOrThrow
+import java.time.Instant
+import java.time.temporal.ChronoUnit
+import scala.language.postfixOps
+
+import scala.collection.mutable
+import scala.concurrent.ExecutionContextExecutor
+import scala.concurrent.duration._
+
+/**
+ * This test will try to fetch the average duration from activation documents. This class guarantee the minimum compatibility.
+ * In case there are any updates in the activation document, it will catch the difference between the expected and the real.
+ */
+@RunWith(classOf[JUnitRunner])
+class ElasticSearchDurationCheckerTests
+ extends FlatSpec
+ with Matchers
+ with ScalaFutures
+ with WskTestHelpers
+ with StreamLogging
+ with ExecHelpers
+ with BeforeAndAfterAll
+ with BeforeAndAfter {
+
+ private val namespace = "durationCheckNamespace"
+ val wskadmin: RunCliCmd = new RunCliCmd {
+ override def baseCommand: mutable.Buffer[String] = WskAdmin.baseCommand
+ }
+ implicit val mt: ActorMaterializer = ActorMaterializer()
+ implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher
+ implicit val timeoutConfig: PatienceConfig = PatienceConfig(5 seconds, 15 milliseconds)
+
+ private val auth = BasicAuthenticationAuthKey()
+ implicit val wskprops: WskProps = WskProps(authKey = auth.compact, namespace = namespace)
+ implicit val transid: TransactionId = TransactionId.testing
+
+ val wsk = new WskRestOperations
+ val elasticSearchConfig: ElasticSearchActivationStoreConfig =
+ loadConfigOrThrow[ElasticSearchActivationStoreConfig](ConfigKeys.elasticSearchActivationStore)
+
+ val testIndex: String = generateIndex(namespace)
+ val concurrency = 1
+ val actionMem: ByteSize = 256.MB
+ val defaultDurationCheckWindow = 5.seconds
+
+ private val httpClientCallback = new HttpClientConfigCallback {
+ override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
+ val provider = new BasicCredentialsProvider
+ provider.setCredentials(
+ AuthScope.ANY,
+ new UsernamePasswordCredentials(elasticSearchConfig.username, elasticSearchConfig.password))
+ httpClientBuilder.setDefaultCredentialsProvider(provider)
+ }
+ }
+
+ private val client =
+ ElasticClient(
+ ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
+ NoOpRequestConfigCallback,
+ httpClientCallback)
+
+ private val elasticSearchDurationChecker = new ElasticSearchDurationChecker(client, defaultDurationCheckWindow)
+
+ override def beforeAll(): Unit = {
+ val res = wskadmin.cli(Seq("user", "create", namespace, "-u", auth.compact))
+ res.exitCode shouldBe 0
+
+ println(s"namespace: $namespace, auth: ${auth.compact}")
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ client.execute {
+ deleteIndex(testIndex)
+ }
+ wskadmin.cli(Seq("user", "delete", namespace))
+ logLines.foreach(println)
+ super.afterAll()
+ }
+
+ behavior of "ElasticSearchDurationChecker"
+
+ it should "fetch the proper duration from ES" in withAssetCleaner(wskprops) { (_, assetHelper) =>
+ val actionName = "avgDuration"
+ val dummyActionName = "dummyAction"
+
+ var totalDuration = 0L
+ val count = 3
+
+ assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
+ action.create(actionName, Some(TestUtils.getTestActionFilename("hello.js")))
+ }
+
+ assetHelper.withCleaner(wsk.action, dummyActionName) { (action, _) =>
+ action.create(dummyActionName, Some(TestUtils.getTestActionFilename("hello.js")))
+ }
+
+ val actionMetaData =
+ WhiskActionMetaData(
+ EntityPath(namespace),
+ EntityName(actionName),
+ js10MetaData(Some("jsMain"), binary = false),
+ limits = actionLimits(actionMem, concurrency))
+
+ val run1 = wsk.action.invoke(actionName, Map())
+ withActivation(wsk.activation, run1) { activation =>
+ activation.response.status shouldBe "success"
+ }
+ // wait for 1s
+ Thread.sleep(1000)
+
+ val start = Instant.now()
+ val run2 = wsk.action.invoke(dummyActionName, Map())
+ withActivation(wsk.activation, run2) { activation =>
+ activation.response.status shouldBe "success"
+ }
+
+ 1 to count foreach { _ =>
+ val run = wsk.action.invoke(actionName, Map())
+ withActivation(wsk.activation, run) { activation =>
+ activation.response.status shouldBe "success"
+ totalDuration += activation.duration
+ }
+ }
+ val end = Instant.now()
+ val timeWindow = math.ceil(ChronoUnit.MILLIS.between(start, end) / 1000.0).seconds
+ val durationChecker = new ElasticSearchDurationChecker(client, timeWindow)
+
+ // it should aggregate the recent activations in 5 seconds
+ val durationCheckResult: DurationCheckResult =
+ durationChecker.checkAverageDuration(namespace, actionMetaData)(res => res).futureValue
+
+ /**
+ * Expected sample data
+ {
+ "_shards": {
+ "failed": 0,
+ "skipped": 0,
+ "successful": 5,
+ "total": 5
+ },
+ "aggregations": {
+ "filterAggregation": {
+ "averageAggregation": {
+ "value": 14
+ },
+ "doc_count": 3
+ }
+ },
+ "hits": {
+ "hits": [],
+ "max_score": 0,
+ "total": 3
+ },
+ "timed_out": false,
+ "took": 2
+ }
+ */
+ truncateDouble(durationCheckResult.averageDuration.getOrElse(0.0)) shouldBe truncateDouble(
+ totalDuration.toDouble / count.toDouble)
+ durationCheckResult.hitCount shouldBe count
+ }
+
+ it should "fetch proper average duration for a package action" in withAssetCleaner(wskprops) { (_, assetHelper) =>
+ val packageName = "samplePackage"
+ val actionName = "packageAction"
+ val fqn = s"$namespace/$packageName/$actionName"
+
+ val actionMetaData =
+ WhiskActionMetaData(
+ EntityPath(s"$namespace/$packageName"),
+ EntityName(actionName),
+ js10MetaData(Some("jsMain"), binary = false),
+ limits = actionLimits(actionMem, concurrency))
+
+ var totalDuration = 0L
+ val count = 3
+
+ assetHelper.withCleaner(wsk.pkg, packageName) { (pkg, _) =>
+ pkg.create(packageName)
+ }
+
+ assetHelper.withCleaner(wsk.action, fqn) { (action, _) =>
+ action.create(fqn, Some(TestUtils.getTestActionFilename("hello.js")))
+ }
+
+ 1 to count foreach { _ =>
+ val run = wsk.action.invoke(fqn, Map())
+ withActivation(wsk.activation, run) { activation =>
+ activation.response.status shouldBe "success"
+ }
+ }
+ // wait for 1s
+ Thread.sleep(1000)
+
+ val start = Instant.now()
+ 1 to count foreach { _ =>
+ val run = wsk.action.invoke(fqn, Map())
+ withActivation(wsk.activation, run) { activation =>
+ activation.response.status shouldBe "success"
+ totalDuration += activation.duration
+ }
+ }
+ val end = Instant.now()
+ val timeWindow = math.ceil(ChronoUnit.MILLIS.between(start, end) / 1000.0).seconds
+ val durationChecker = new ElasticSearchDurationChecker(client, timeWindow)
+ val durationCheckResult: DurationCheckResult =
+ durationChecker.checkAverageDuration(namespace, actionMetaData)(res => res).futureValue
+
+ /**
+ * Expected sample data
+ {
+ "_shards": {
+ "failed": 0,
+ "skipped": 0,
+ "successful": 5,
+ "total": 5
+ },
+ "aggregations": {
+ "filterAggregation": {
+ "averageAggregation": {
+ "value": 13
+ },
+ "doc_count": 3
+ }
+ },
+ "hits": {
+ "hits": [],
+ "max_score": 0,
+ "total": 6
+ },
+ "timed_out": false,
+ "took": 0
+ }
+ */
+ truncateDouble(durationCheckResult.averageDuration.getOrElse(0.0)) shouldBe truncateDouble(
+ totalDuration.toDouble / count.toDouble)
+ durationCheckResult.hitCount shouldBe count
+ }
+
+ it should "fetch the duration for binding action" in withAssetCleaner(wskprops) { (_, assetHelper) =>
+ val packageName = "testPackage"
+ val actionName = "testAction"
+ val originalFQN = s"$namespace/$packageName/$actionName"
+ val boundPackageName = "boundPackage"
+
+ val actionMetaData =
+ WhiskActionMetaData(
+ EntityPath(s"$namespace/$boundPackageName"),
+ EntityName(actionName),
+ js10MetaData(Some("jsMain"), binary = false),
+ limits = actionLimits(actionMem, concurrency),
+ binding = Some(EntityPath(s"$namespace/$packageName")))
+
+ var totalDuration = 0L
+ val count = 3
+
+ assetHelper.withCleaner(wsk.pkg, packageName) { (pkg, _) =>
+ pkg.create(packageName, shared = Some(true))
+ }
+
+ assetHelper.withCleaner(wsk.action, originalFQN) { (action, _) =>
+ action.create(originalFQN, Some(TestUtils.getTestActionFilename("hello.js")))
+ }
+
+ assetHelper.withCleaner(wsk.pkg, boundPackageName) { (pkg, _) =>
+ pkg.bind(packageName, boundPackageName)
+ }
+
+ 1 to count foreach { _ =>
+ val run = wsk.action.invoke(s"$boundPackageName/$actionName", Map())
+ withActivation(wsk.activation, run) { activation =>
+ activation.response.status shouldBe "success"
+ }
+ }
+ // wait for 1s
+ Thread.sleep(1000)
+
+ val start = Instant.now()
+ 1 to count foreach { _ =>
+ val run = wsk.action.invoke(s"$boundPackageName/$actionName", Map())
+ withActivation(wsk.activation, run) { activation =>
+ activation.response.status shouldBe "success"
+ totalDuration += activation.duration
+ }
+ }
+ val end = Instant.now()
+ val timeWindow = math.ceil(ChronoUnit.MILLIS.between(start, end) / 1000.0).seconds
+ val durationChecker = new ElasticSearchDurationChecker(client, timeWindow)
+ val durationCheckResult: DurationCheckResult =
+ durationChecker.checkAverageDuration(namespace, actionMetaData)(res => res).futureValue
+
+ /**
+ * Expected sample data
+ {
+ "_shards": {
+ "failed": 0,
+ "skipped": 0,
+ "successful": 5,
+ "total": 5
+ },
+ "aggregations": {
+ "averageAggregation": {
+ "value": 14
+ }
+ },
+ "hits": {
+ "hits": [],
+ "max_score": 0,
+ "total": 3
+ },
+ "timed_out": false,
+ "took": 0
+ }
+ */
+ truncateDouble(durationCheckResult.averageDuration.getOrElse(0.0)) shouldBe truncateDouble(
+ totalDuration.toDouble / count.toDouble)
+ durationCheckResult.hitCount shouldBe count
+ }
+
+ it should "return nothing properly if there is no activation yet" in withAssetCleaner(wskprops) { (_, _) =>
+ val actionName = "noneAction"
+
+ val actionMetaData =
+ WhiskActionMetaData(
+ EntityPath(s"$namespace"),
+ EntityName(actionName),
+ js10MetaData(Some("jsMain"), binary = false),
+ limits = actionLimits(actionMem, concurrency))
+
+ val durationCheckResult: DurationCheckResult =
+ elasticSearchDurationChecker.checkAverageDuration(namespace, actionMetaData)(res => res).futureValue
+
+ durationCheckResult.averageDuration shouldBe None
+ durationCheckResult.hitCount shouldBe 0
+ }
+
+ it should "return nothing properly if there is no activation for binding action yet" in withAssetCleaner(wskprops) {
+ (_, _) =>
+ val packageName = "testPackage2"
+ val actionName = "noneAction"
+ val boundPackageName = "boundPackage2"
+
+ val actionMetaData =
+ WhiskActionMetaData(
+ EntityPath(s"$namespace/$boundPackageName"),
+ EntityName(actionName),
+ js10MetaData(Some("jsMain"), false),
+ limits = actionLimits(actionMem, concurrency),
+ binding = Some(EntityPath(s"${namespace}/${packageName}")))
+
+ val durationCheckResult: DurationCheckResult =
+ elasticSearchDurationChecker.checkAverageDuration(namespace, actionMetaData)(res => res).futureValue
+
+ durationCheckResult.averageDuration shouldBe None
+ durationCheckResult.hitCount shouldBe 0
+ }
+
+ private def truncateDouble(number: Double, scale: Int = 2) = {
+ BigDecimal(number).setScale(scale, BigDecimal.RoundingMode.HALF_UP).toDouble
+ }
+
+ private def generateIndex(namespace: String): String = {
+ elasticSearchConfig.indexPattern.dropWhile(_ == '/') format namespace.toLowerCase
+ }
+}
diff --git a/tools/travis/setupPrereq.sh b/tools/travis/setupPrereq.sh
index f820c28..7832938 100755
--- a/tools/travis/setupPrereq.sh
+++ b/tools/travis/setupPrereq.sh
@@ -31,6 +31,7 @@
$ANSIBLE_CMD couchdb.yml
$ANSIBLE_CMD initdb.yml
$ANSIBLE_CMD wipe.yml
+$ANSIBLE_CMD elasticsearch.yml
$ANSIBLE_CMD properties.yml -e manifest_file="$RUNTIMES_MANIFEST"
echo "Time taken for ${0##*/} is $SECONDS secs"
diff --git a/tools/travis/setupSystem.sh b/tools/travis/setupSystem.sh
index 0750ddd..9f99cf7 100755
--- a/tools/travis/setupSystem.sh
+++ b/tools/travis/setupSystem.sh
@@ -26,7 +26,7 @@
cd $ROOTDIR/ansible
-$ANSIBLE_CMD openwhisk.yml -e manifest_file="$RUNTIMES_MANIFEST"
+$ANSIBLE_CMD openwhisk.yml -e manifest_file="$RUNTIMES_MANIFEST" -e db_activation_backend=ElasticSearch
$ANSIBLE_CMD apigateway.yml
$ANSIBLE_CMD routemgmt.yml