[New Scheduler] Add duration checker (#4984)

* Add a duration checker for Elasticsearch.

* Add configurations for the ElasticSearchDurationCheckerTests class

* Use a private helper function to execute queries.

* Add an Ansible variable for the duration checker.

* Apply scalaFmt

* Include test cases for duration checker to system tests.

* Setup ElasticSearch for system tests.

* Increase patience config to wait for response longer.

* Add postfixOps
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 342ad1d..564a021 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -421,3 +421,6 @@
     port: "{{ metrics_kamon_statsd_port | default('8125') }}"
 
 user_events: "{{ user_events_enabled | default(false) | lower }}"
+
+durationChecker:
+    timeWindow: "{{ duration_checker_time_window | default('1 d') }}"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 7c0ec2b..84e75e8 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -272,6 +272,8 @@
   val metrics = "whisk.metrics"
   val featureFlags = "whisk.feature-flags"
 
+  val durationChecker = s"whisk.duration-checker"
+
   val whiskConfig = "whisk.config"
   val sharedPackageExecuteOnly = s"whisk.shared-packages-execute-only"
   val swaggerUi = "whisk.swagger-ui"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
index 5d110a7..8f25712 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
@@ -57,27 +57,17 @@
 
 class ElasticSearchActivationStore(
   httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
-  elasticSearchConfig: ElasticSearchActivationStoreConfig =
-    loadConfigOrThrow[ElasticSearchActivationStoreConfig](ConfigKeys.elasticSearchActivationStore),
+  elasticSearchConfig: ElasticSearchActivationStoreConfig,
   useBatching: Boolean = false)(implicit actorSystem: ActorSystem,
                                 actorMaterializer: ActorMaterializer,
                                 logging: Logging)
     extends ActivationStore {
 
   import com.sksamuel.elastic4s.http.ElasticDsl._
+  import ElasticSearchActivationStore.{generateIndex, httpClientCallback}
 
   private implicit val executionContextExecutor: ExecutionContextExecutor = actorSystem.dispatcher
 
-  private val httpClientCallback = new HttpClientConfigCallback {
-    override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
-      val provider = new BasicCredentialsProvider
-      provider.setCredentials(
-        AuthScope.ANY,
-        new UsernamePasswordCredentials(elasticSearchConfig.username, elasticSearchConfig.password))
-      httpClientBuilder.setDefaultCredentialsProvider(provider)
-    }
-  }
-
   private val client =
     ElasticClient(
       ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
@@ -407,10 +397,6 @@
     activationId.toString.split("/")(0)
   }
 
-  private def generateIndex(namespace: String): String = {
-    elasticSearchConfig.indexPattern.dropWhile(_ == '/') format namespace.toLowerCase
-  }
-
   private def generateRangeQuery(key: String, since: Option[Instant], upto: Option[Instant]): RangeQuery = {
     rangeQuery(key)
       .gte(since.map(_.toEpochMilli).getOrElse(minStart))
@@ -418,7 +404,31 @@
   }
 }
 
+object ElasticSearchActivationStore {
+  val elasticSearchConfig: ElasticSearchActivationStoreConfig =
+    loadConfigOrThrow[ElasticSearchActivationStoreConfig](ConfigKeys.elasticSearchActivationStore)
+
+  val httpClientCallback = new HttpClientConfigCallback {
+    override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
+      val provider = new BasicCredentialsProvider
+      provider.setCredentials(
+        AuthScope.ANY,
+        new UsernamePasswordCredentials(elasticSearchConfig.username, elasticSearchConfig.password))
+      httpClientBuilder.setDefaultCredentialsProvider(provider)
+    }
+  }
+
+  def generateIndex(namespace: String): String = {
+    elasticSearchConfig.indexPattern.dropWhile(_ == '/') format namespace.toLowerCase
+  }
+}
+
 object ElasticSearchActivationStoreProvider extends ActivationStoreProvider {
+  import ElasticSearchActivationStore.elasticSearchConfig
+
   override def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging) =
-    new ElasticSearchActivationStore(useBatching = true)(actorSystem, actorMaterializer, logging)
+    new ElasticSearchActivationStore(elasticSearchConfig = elasticSearchConfig, useBatching = true)(
+      actorSystem,
+      actorMaterializer,
+      logging)
 }
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/ElasticSearchDurationChecker.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/ElasticSearchDurationChecker.scala
new file mode 100644
index 0000000..88e1f2f
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/ElasticSearchDurationChecker.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openwhisk.core.scheduler.queue
+
+import akka.actor.ActorSystem
+import com.sksamuel.elastic4s.http.ElasticDsl._
+import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties, NoOpRequestConfigCallback}
+import com.sksamuel.elastic4s.searches.queries.Query
+import com.sksamuel.elastic4s.{ElasticDate, ElasticDateMath, Seconds}
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.entity.WhiskActionMetaData
+import org.apache.openwhisk.spi.Spi
+import pureconfig.loadConfigOrThrow
+import spray.json.{JsArray, JsNumber, JsValue, RootJsonFormat, deserializationError, _}
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+import scala.language.implicitConversions
+import scala.util.{Failure, Try}
+import pureconfig.generic.auto._
+
+trait DurationChecker {
+  def checkAverageDuration(invocationNamespace: String, actionMetaData: WhiskActionMetaData)(
+    callback: DurationCheckResult => DurationCheckResult): Future[DurationCheckResult]
+}
+
+case class DurationCheckResult(averageDuration: Option[Double], hitCount: Long, took: Long)
+
+object ElasticSearchDurationChecker {
+  val FilterAggregationName = "filterAggregation"
+  val AverageAggregationName = "averageAggregation"
+
+  implicit val serde = new ElasticSearchDurationCheckResultFormat()
+
+  def getFromDate(timeWindow: FiniteDuration): ElasticDateMath =
+    ElasticDate.now minus (timeWindow.toSeconds.toInt, Seconds)
+}
+
+class ElasticSearchDurationChecker(private val client: ElasticClient, val timeWindow: FiniteDuration)(
+  implicit val actorSystem: ActorSystem,
+  implicit val logging: Logging)
+    extends DurationChecker {
+  import ElasticSearchDurationChecker._
+  import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStore.generateIndex
+
+  implicit val ec = actorSystem.getDispatcher
+
+  override def checkAverageDuration(invocationNamespace: String, actionMetaData: WhiskActionMetaData)(
+    callback: DurationCheckResult => DurationCheckResult): Future[DurationCheckResult] = {
+    val index = generateIndex(invocationNamespace)
+    val fqn = actionMetaData.fullyQualifiedName(false)
+    val fromDate = getFromDate(timeWindow)
+
+    logging.info(this, s"check average duration for $fqn in $index for last $timeWindow")
+
+    actionMetaData.binding match {
+      case Some(binding) =>
+        val boolQueryResult = List(
+          matchQuery("annotations.binding", s"$binding"),
+          matchQuery("name", actionMetaData.name),
+          rangeQuery("@timestamp").gte(fromDate))
+
+        executeQuery(boolQueryResult, callback, index)
+
+      case None =>
+        val queryResult = List(matchQuery("path.keyword", fqn.toString), rangeQuery("@timestamp").gte(fromDate))
+
+        executeQuery(queryResult, callback, index)
+    }
+  }
+
+  private def executeQuery(boolQueryResult: List[Query],
+                           callback: DurationCheckResult => DurationCheckResult,
+                           index: String) = {
+    client
+      .execute {
+        (search(index) query {
+          boolQuery must {
+            boolQueryResult
+          }
+        } aggregations
+          avgAgg(AverageAggregationName, "duration")).size(0)
+      }
+      .map { res =>
+        logging.debug(this, s"ElasticSearch query results: $res")
+        Try(serde.read(res.body.getOrElse("").parseJson))
+      }
+      .flatMap(Future.fromTry)
+      .map(callback(_))
+      .andThen {
+        case Failure(t) =>
+          logging.error(this, s"failed to check the average duration: ${t}")
+      }
+  }
+}
+
+object ElasticSearchDurationCheckerProvider extends DurationCheckerProvider {
+  import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStore._
+
+  override def instance(actorSystem: ActorSystem, log: Logging): ElasticSearchDurationChecker = {
+    implicit val as: ActorSystem = actorSystem
+    implicit val logging: Logging = log
+
+    val elasticClient =
+      ElasticClient(
+        ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
+        NoOpRequestConfigCallback,
+        httpClientCallback)
+
+    new ElasticSearchDurationChecker(elasticClient, durationCheckerConfig.timeWindow)
+  }
+}
+
+trait DurationCheckerProvider extends Spi {
+
+  val durationCheckerConfig: DurationCheckerConfig =
+    loadConfigOrThrow[DurationCheckerConfig](ConfigKeys.durationChecker)
+
+  def instance(actorSystem: ActorSystem, logging: Logging): DurationChecker
+}
+
+class ElasticSearchDurationCheckResultFormat extends RootJsonFormat[DurationCheckResult] {
+  import ElasticSearchDurationChecker._
+  import spray.json.DefaultJsonProtocol._
+
+  /**
+   * Expected sample data
+      {
+          "_shards": {
+              "failed": 0,
+              "skipped": 0,
+              "successful": 5,
+              "total": 5
+          },
+          "aggregations": {
+              "agg": {
+                  "value": 14
+              }
+          },
+          "hits": {
+              "hits": [],
+              "max_score": 0,
+              "total": 3
+          },
+          "timed_out": false,
+          "took": 0
+      }
+   */
+  /**
+   * Expected sample data
+      {
+          "_shards": {
+              "failed": 0,
+              "skipped": 0,
+              "successful": 5,
+              "total": 5
+          },
+          "aggregations": {
+              "pathAggregation": {
+                  "avg_duration": {
+                      "value": 13
+                  },
+                  "doc_count": 3
+              }
+          },
+          "hits": {
+              "hits": [],
+              "max_score": 0,
+              "total": 6
+          },
+          "timed_out": false,
+          "took": 0
+      }
+   */
+  implicit def read(json: JsValue) = {
+    val jsObject = json.asJsObject
+
+    jsObject.getFields("aggregations", "took", "hits") match {
+      case Seq(aggregations, took, hits) =>
+        val hitCount = hits.asJsObject.getFields("total").headOption
+        val filterAggregations = aggregations.asJsObject.getFields(FilterAggregationName)
+        val averageAggregations = aggregations.asJsObject.getFields(AverageAggregationName)
+
+        (filterAggregations, averageAggregations, hitCount) match {
+          case (filterAggregations, _, Some(count)) if filterAggregations.nonEmpty =>
+            val averageDuration =
+              filterAggregations.headOption.flatMap(
+                _.asJsObject
+                  .getFields(AverageAggregationName)
+                  .headOption
+                  .flatMap(_.asJsObject.getFields("value").headOption))
+
+            averageDuration match {
+              case Some(JsNull) =>
+                DurationCheckResult(None, count.convertTo[Long], took.convertTo[Long])
+
+              case Some(duration) =>
+                DurationCheckResult(Some(duration.convertTo[Double]), count.convertTo[Long], took.convertTo[Long])
+
+              case _ => deserializationError("Cannot deserialize ProductItem: invalid input. Raw input: ")
+            }
+
+          case (_, averageAggregations, Some(count)) if averageAggregations.nonEmpty =>
+            val averageDuration = averageAggregations.headOption.flatMap(_.asJsObject.getFields("value").headOption)
+
+            averageDuration match {
+              case Some(JsNull) =>
+                DurationCheckResult(None, count.convertTo[Long], took.convertTo[Long])
+
+              case Some(duration) =>
+                DurationCheckResult(Some(duration.convertTo[Double]), count.convertTo[Long], took.convertTo[Long])
+
+              case t => deserializationError(s"Cannot deserialize DurationCheckResult: invalid input. Raw input: $t")
+            }
+
+          case t => deserializationError(s"Cannot deserialize DurationCheckResult: invalid input. Raw input: $t")
+        }
+
+      case other => deserializationError(s"Cannot deserialize DurationCheckResult: invalid input. Raw input: $other")
+    }
+
+  }
+
+  // This method would not be used.
+  override def write(obj: DurationCheckResult): JsValue = {
+    JsArray(JsNumber(obj.averageDuration.get), JsNumber(obj.hitCount), JsNumber(obj.took))
+  }
+}
+
+case class DurationCheckerConfig(timeWindow: FiniteDuration)
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/NoopDurationChecker.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/NoopDurationChecker.scala
new file mode 100644
index 0000000..441bfed
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/NoopDurationChecker.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.queue
+
+import akka.actor.ActorSystem
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.entity.WhiskActionMetaData
+
+import scala.concurrent.Future
+
+object NoopDurationCheckerProvider extends DurationCheckerProvider {
+  override def instance(actorSystem: ActorSystem, log: Logging): NoopDurationChecker = {
+    implicit val as: ActorSystem = actorSystem
+    implicit val logging: Logging = log
+    new NoopDurationChecker()
+  }
+}
+
+object NoopDurationChecker {
+  implicit val serde = new ElasticSearchDurationCheckResultFormat()
+}
+
+class NoopDurationChecker extends DurationChecker {
+  import scala.concurrent.ExecutionContext.Implicits.global
+
+  override def checkAverageDuration(invocationNamespace: String, actionMetaData: WhiskActionMetaData)(
+    callback: DurationCheckResult => DurationCheckResult): Future[DurationCheckResult] = {
+    Future {
+      DurationCheckResult(Option.apply(0), 0, 0)
+    }
+  }
+}
diff --git a/tests/build.gradle b/tests/build.gradle
index f2fdd2a..57c3bd7 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -40,6 +40,7 @@
 def projectsWithCoverage = [
     ':common:scala',
     ':core:controller',
+    ':core:scheduler',
     ':core:invoker',
     ':tools:admin',
     ':core:cosmosdb:cache-invalidator'
@@ -52,6 +53,7 @@
             "org/apache/openwhisk/core/apigw/actions/test/**",
             "org/apache/openwhisk/core/database/test/*CacheConcurrencyTests*",
             "org/apache/openwhisk/core/controller/test/*ControllerApiTests*",
+            "org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheck*",
             "apigw/healthtests/**",
             "ha/**",
             "services/**",
@@ -70,6 +72,7 @@
             "org/apache/openwhisk/standalone/**",
             "org/apache/openwhisk/core/cli/test/**",
             "org/apache/openwhisk/core/limits/**",
+            "org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheck*",
             "**/*CacheConcurrencyTests*",
             "**/*ControllerApiTests*",
             "org/apache/openwhisk/testEntities/**",
@@ -221,6 +224,7 @@
 
     compile project(':common:scala')
     compile project(':core:controller')
+    compile project(':core:scheduler')
     compile project(':core:invoker')
     compile project(':core:cosmosdb:cache-invalidator')
     compile project(':core:monitoring:user-events')
diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2
index 53d1cc9..90802bc 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -97,13 +97,27 @@
     parameter-storage {
         current = "off"
     }
-    
+
     elasticsearch {
         docker-image = "{{ elasticsearch.docker_image | default('docker.elastic.co/elasticsearch/elasticsearch:' ~ elasticsearch.version ) }}"
     }
 
     helm.release = "release"
     runtime.delete.timeout = "30 seconds"
+
+    duration-checker {
+        time-window = "{{ durationChecker.timeWindow }}"
+    }
+
+    activation-store {
+        elasticsearch {
+            protocol      = "{{ db.elasticsearch.protocol }}"
+            hosts         = "{{ elasticsearch_connect_string }}"
+            index-pattern = "{{ db.elasticsearch.index_pattern }}"
+            username      = "{{ db.elasticsearch.auth.admin.username }}"
+            password      = "{{ db.elasticsearch.auth.admin.password }}"
+        }
+    }
 }
 
 #test-only overrides so that tests can override defaults in application.conf (todo: move all defaults to reference.conf)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckResultFormatTest.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckResultFormatTest.scala
new file mode 100644
index 0000000..fe28e73
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckResultFormatTest.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.queue.test
+
+import org.apache.openwhisk.core.scheduler.queue.{DurationCheckResult, ElasticSearchDurationCheckResultFormat}
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+import spray.json._
+
+@RunWith(classOf[JUnitRunner])
+class ElasticSearchDurationCheckResultFormatTest extends FlatSpec with Matchers with ScalaFutures {
+  behavior of "ElasticSearchDurationCheckResultFormatTest"
+
+  val serde = new ElasticSearchDurationCheckResultFormat()
+
+  it should "serialize the data correctly" in {
+    val normalData = """{
+          "_shards": {
+              "failed": 0,
+              "skipped": 0,
+              "successful": 5,
+              "total": 5
+          },
+          "aggregations": {
+              "filterAggregation": {
+                  "averageAggregation": {
+                      "value": 14
+                  },
+                  "doc_count": 3
+              }
+          },
+          "hits": {
+              "hits": [],
+              "max_score": 0,
+              "total": 3
+          },
+          "timed_out": false,
+          "took": 2
+      }"""
+
+    val bindingData = """{
+          "_shards": {
+              "failed": 0,
+              "skipped": 0,
+              "successful": 5,
+              "total": 5
+          },
+          "aggregations": {
+              "averageAggregation": {
+                  "value": 12
+              }
+          },
+          "hits": {
+              "hits": [],
+              "max_score": 0,
+              "total": 2
+          },
+          "timed_out": false,
+          "took": 0
+      }"""
+
+    val expected1 = DurationCheckResult(Some(14), 3, 2)
+    val expected2 = DurationCheckResult(Some(12), 2, 0)
+    val result1 = serde.read(normalData.parseJson)
+    val result2 = serde.read(bindingData.parseJson)
+
+    result1 shouldBe expected1
+    result2 shouldBe expected2
+  }
+
+  // Since the write method is not being used, this test is meaningless but added just for the duality.
+  it should "deserialize the data correctly" in {
+    val data = DurationCheckResult(Some(14), 3, 2)
+    val expected =
+      """[14, 3, 2]
+        |
+        |""".stripMargin
+    val result = serde.write(data)
+
+    result shouldBe expected.parseJson
+  }
+
+  it should "throw an exception when data is not in the expected format" in {
+    val malformedData = """{
+          "_shards": {
+              "failed": 0,
+              "skipped": 0,
+              "successful": 5,
+              "total": 5
+          },
+          "averageAggregation": {
+              "value": 14
+          },
+          "hits": {
+              "hits": [],
+              "max_score": 0,
+              "total": 3
+          },
+          "timed_out": false,
+          "took": 2
+      }"""
+
+    assertThrows[DeserializationException] {
+      serde.read(malformedData.parseJson)
+    }
+  }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckerTests.scala
new file mode 100644
index 0000000..867c0cd
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckerTests.scala
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.queue.test
+
+import akka.stream.ActorMaterializer
+import com.sksamuel.elastic4s.http.ElasticDsl._
+import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties, NoOpRequestConfigCallback}
+import common._
+import common.rest.WskRestOperations
+import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
+import org.apache.http.impl.client.BasicCredentialsProvider
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStoreConfig
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.test.ExecHelpers
+import org.apache.openwhisk.core.scheduler.queue.{DurationCheckResult, ElasticSearchDurationChecker}
+import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FlatSpec, Matchers}
+import org.scalatestplus.junit.JUnitRunner
+import pureconfig.generic.auto._
+import pureconfig.loadConfigOrThrow
+import java.time.Instant
+import java.time.temporal.ChronoUnit
+import scala.language.postfixOps
+
+import scala.collection.mutable
+import scala.concurrent.ExecutionContextExecutor
+import scala.concurrent.duration._
+
+/**
+ * This test will try to fetch the average duration from activation documents. This class guarantee the minimum compatibility.
+ * In case there are any updates in the activation document, it will catch the difference between the expected and the real.
+ */
+@RunWith(classOf[JUnitRunner])
+class ElasticSearchDurationCheckerTests
+    extends FlatSpec
+    with Matchers
+    with ScalaFutures
+    with WskTestHelpers
+    with StreamLogging
+    with ExecHelpers
+    with BeforeAndAfterAll
+    with BeforeAndAfter {
+
+  private val namespace = "durationCheckNamespace"
+  val wskadmin: RunCliCmd = new RunCliCmd {
+    override def baseCommand: mutable.Buffer[String] = WskAdmin.baseCommand
+  }
+  implicit val mt: ActorMaterializer = ActorMaterializer()
+  implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher
+  implicit val timeoutConfig: PatienceConfig = PatienceConfig(5 seconds, 15 milliseconds)
+
+  private val auth = BasicAuthenticationAuthKey()
+  implicit val wskprops: WskProps = WskProps(authKey = auth.compact, namespace = namespace)
+  implicit val transid: TransactionId = TransactionId.testing
+
+  val wsk = new WskRestOperations
+  val elasticSearchConfig: ElasticSearchActivationStoreConfig =
+    loadConfigOrThrow[ElasticSearchActivationStoreConfig](ConfigKeys.elasticSearchActivationStore)
+
+  val testIndex: String = generateIndex(namespace)
+  val concurrency = 1
+  val actionMem: ByteSize = 256.MB
+  val defaultDurationCheckWindow = 5.seconds
+
+  private val httpClientCallback = new HttpClientConfigCallback {
+    override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
+      val provider = new BasicCredentialsProvider
+      provider.setCredentials(
+        AuthScope.ANY,
+        new UsernamePasswordCredentials(elasticSearchConfig.username, elasticSearchConfig.password))
+      httpClientBuilder.setDefaultCredentialsProvider(provider)
+    }
+  }
+
+  private val client =
+    ElasticClient(
+      ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
+      NoOpRequestConfigCallback,
+      httpClientCallback)
+
+  private val elasticSearchDurationChecker = new ElasticSearchDurationChecker(client, defaultDurationCheckWindow)
+
+  override def beforeAll(): Unit = {
+    val res = wskadmin.cli(Seq("user", "create", namespace, "-u", auth.compact))
+    res.exitCode shouldBe 0
+
+    println(s"namespace: $namespace, auth: ${auth.compact}")
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    client.execute {
+      deleteIndex(testIndex)
+    }
+    wskadmin.cli(Seq("user", "delete", namespace))
+    logLines.foreach(println)
+    super.afterAll()
+  }
+
+  behavior of "ElasticSearchDurationChecker"
+
+  it should "fetch the proper duration from ES" in withAssetCleaner(wskprops) { (_, assetHelper) =>
+    val actionName = "avgDuration"
+    val dummyActionName = "dummyAction"
+
+    var totalDuration = 0L
+    val count = 3
+
+    assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
+      action.create(actionName, Some(TestUtils.getTestActionFilename("hello.js")))
+    }
+
+    assetHelper.withCleaner(wsk.action, dummyActionName) { (action, _) =>
+      action.create(dummyActionName, Some(TestUtils.getTestActionFilename("hello.js")))
+    }
+
+    val actionMetaData =
+      WhiskActionMetaData(
+        EntityPath(namespace),
+        EntityName(actionName),
+        js10MetaData(Some("jsMain"), binary = false),
+        limits = actionLimits(actionMem, concurrency))
+
+    val run1 = wsk.action.invoke(actionName, Map())
+    withActivation(wsk.activation, run1) { activation =>
+      activation.response.status shouldBe "success"
+    }
+    // wait for 1s
+    Thread.sleep(1000)
+
+    val start = Instant.now()
+    val run2 = wsk.action.invoke(dummyActionName, Map())
+    withActivation(wsk.activation, run2) { activation =>
+      activation.response.status shouldBe "success"
+    }
+
+    1 to count foreach { _ =>
+      val run = wsk.action.invoke(actionName, Map())
+      withActivation(wsk.activation, run) { activation =>
+        activation.response.status shouldBe "success"
+        totalDuration += activation.duration
+      }
+    }
+    val end = Instant.now()
+    val timeWindow = math.ceil(ChronoUnit.MILLIS.between(start, end) / 1000.0).seconds
+    val durationChecker = new ElasticSearchDurationChecker(client, timeWindow)
+
+    // it should aggregate the recent activations in 5 seconds
+    val durationCheckResult: DurationCheckResult =
+      durationChecker.checkAverageDuration(namespace, actionMetaData)(res => res).futureValue
+
+    /**
+     * Expected sample data
+      {
+          "_shards": {
+              "failed": 0,
+              "skipped": 0,
+              "successful": 5,
+              "total": 5
+          },
+          "aggregations": {
+              "filterAggregation": {
+                  "averageAggregation": {
+                      "value": 14
+                  },
+                  "doc_count": 3
+              }
+          },
+          "hits": {
+              "hits": [],
+              "max_score": 0,
+              "total": 3
+          },
+          "timed_out": false,
+          "took": 2
+      }
+     */
+    truncateDouble(durationCheckResult.averageDuration.getOrElse(0.0)) shouldBe truncateDouble(
+      totalDuration.toDouble / count.toDouble)
+    durationCheckResult.hitCount shouldBe count
+  }
+
+  it should "fetch proper average duration for a package action" in withAssetCleaner(wskprops) { (_, assetHelper) =>
+    val packageName = "samplePackage"
+    val actionName = "packageAction"
+    val fqn = s"$namespace/$packageName/$actionName"
+
+    val actionMetaData =
+      WhiskActionMetaData(
+        EntityPath(s"$namespace/$packageName"),
+        EntityName(actionName),
+        js10MetaData(Some("jsMain"), binary = false),
+        limits = actionLimits(actionMem, concurrency))
+
+    var totalDuration = 0L
+    val count = 3
+
+    assetHelper.withCleaner(wsk.pkg, packageName) { (pkg, _) =>
+      pkg.create(packageName)
+    }
+
+    assetHelper.withCleaner(wsk.action, fqn) { (action, _) =>
+      action.create(fqn, Some(TestUtils.getTestActionFilename("hello.js")))
+    }
+
+    1 to count foreach { _ =>
+      val run = wsk.action.invoke(fqn, Map())
+      withActivation(wsk.activation, run) { activation =>
+        activation.response.status shouldBe "success"
+      }
+    }
+    // wait for 1s
+    Thread.sleep(1000)
+
+    val start = Instant.now()
+    1 to count foreach { _ =>
+      val run = wsk.action.invoke(fqn, Map())
+      withActivation(wsk.activation, run) { activation =>
+        activation.response.status shouldBe "success"
+        totalDuration += activation.duration
+      }
+    }
+    val end = Instant.now()
+    val timeWindow = math.ceil(ChronoUnit.MILLIS.between(start, end) / 1000.0).seconds
+    val durationChecker = new ElasticSearchDurationChecker(client, timeWindow)
+    val durationCheckResult: DurationCheckResult =
+      durationChecker.checkAverageDuration(namespace, actionMetaData)(res => res).futureValue
+
+    /**
+     * Expected sample data
+      {
+          "_shards": {
+              "failed": 0,
+              "skipped": 0,
+              "successful": 5,
+              "total": 5
+          },
+          "aggregations": {
+              "filterAggregation": {
+                  "averageAggregation": {
+                      "value": 13
+                  },
+                  "doc_count": 3
+              }
+          },
+          "hits": {
+              "hits": [],
+              "max_score": 0,
+              "total": 6
+          },
+          "timed_out": false,
+          "took": 0
+      }
+     */
+    truncateDouble(durationCheckResult.averageDuration.getOrElse(0.0)) shouldBe truncateDouble(
+      totalDuration.toDouble / count.toDouble)
+    durationCheckResult.hitCount shouldBe count
+  }
+
+  it should "fetch the duration for binding action" in withAssetCleaner(wskprops) { (_, assetHelper) =>
+    val packageName = "testPackage"
+    val actionName = "testAction"
+    val originalFQN = s"$namespace/$packageName/$actionName"
+    val boundPackageName = "boundPackage"
+
+    val actionMetaData =
+      WhiskActionMetaData(
+        EntityPath(s"$namespace/$boundPackageName"),
+        EntityName(actionName),
+        js10MetaData(Some("jsMain"), binary = false),
+        limits = actionLimits(actionMem, concurrency),
+        binding = Some(EntityPath(s"$namespace/$packageName")))
+
+    var totalDuration = 0L
+    val count = 3
+
+    assetHelper.withCleaner(wsk.pkg, packageName) { (pkg, _) =>
+      pkg.create(packageName, shared = Some(true))
+    }
+
+    assetHelper.withCleaner(wsk.action, originalFQN) { (action, _) =>
+      action.create(originalFQN, Some(TestUtils.getTestActionFilename("hello.js")))
+    }
+
+    assetHelper.withCleaner(wsk.pkg, boundPackageName) { (pkg, _) =>
+      pkg.bind(packageName, boundPackageName)
+    }
+
+    1 to count foreach { _ =>
+      val run = wsk.action.invoke(s"$boundPackageName/$actionName", Map())
+      withActivation(wsk.activation, run) { activation =>
+        activation.response.status shouldBe "success"
+      }
+    }
+    // wait for 1s
+    Thread.sleep(1000)
+
+    val start = Instant.now()
+    1 to count foreach { _ =>
+      val run = wsk.action.invoke(s"$boundPackageName/$actionName", Map())
+      withActivation(wsk.activation, run) { activation =>
+        activation.response.status shouldBe "success"
+        totalDuration += activation.duration
+      }
+    }
+    val end = Instant.now()
+    val timeWindow = math.ceil(ChronoUnit.MILLIS.between(start, end) / 1000.0).seconds
+    val durationChecker = new ElasticSearchDurationChecker(client, timeWindow)
+    val durationCheckResult: DurationCheckResult =
+      durationChecker.checkAverageDuration(namespace, actionMetaData)(res => res).futureValue
+
+    /**
+     * Expected sample data
+      {
+          "_shards": {
+              "failed": 0,
+              "skipped": 0,
+              "successful": 5,
+              "total": 5
+          },
+          "aggregations": {
+              "averageAggregation": {
+                  "value": 14
+              }
+          },
+          "hits": {
+              "hits": [],
+              "max_score": 0,
+              "total": 3
+          },
+          "timed_out": false,
+          "took": 0
+      }
+     */
+    truncateDouble(durationCheckResult.averageDuration.getOrElse(0.0)) shouldBe truncateDouble(
+      totalDuration.toDouble / count.toDouble)
+    durationCheckResult.hitCount shouldBe count
+  }
+
+  it should "return nothing properly if there is no activation yet" in withAssetCleaner(wskprops) { (_, _) =>
+    val actionName = "noneAction"
+
+    val actionMetaData =
+      WhiskActionMetaData(
+        EntityPath(s"$namespace"),
+        EntityName(actionName),
+        js10MetaData(Some("jsMain"), binary = false),
+        limits = actionLimits(actionMem, concurrency))
+
+    val durationCheckResult: DurationCheckResult =
+      elasticSearchDurationChecker.checkAverageDuration(namespace, actionMetaData)(res => res).futureValue
+
+    durationCheckResult.averageDuration shouldBe None
+    durationCheckResult.hitCount shouldBe 0
+  }
+
+  it should "return nothing properly if there is no activation for binding action yet" in withAssetCleaner(wskprops) {
+    (_, _) =>
+      val packageName = "testPackage2"
+      val actionName = "noneAction"
+      val boundPackageName = "boundPackage2"
+
+      val actionMetaData =
+        WhiskActionMetaData(
+          EntityPath(s"$namespace/$boundPackageName"),
+          EntityName(actionName),
+          js10MetaData(Some("jsMain"), false),
+          limits = actionLimits(actionMem, concurrency),
+          binding = Some(EntityPath(s"${namespace}/${packageName}")))
+
+      val durationCheckResult: DurationCheckResult =
+        elasticSearchDurationChecker.checkAverageDuration(namespace, actionMetaData)(res => res).futureValue
+
+      durationCheckResult.averageDuration shouldBe None
+      durationCheckResult.hitCount shouldBe 0
+  }
+
+  private def truncateDouble(number: Double, scale: Int = 2) = {
+    BigDecimal(number).setScale(scale, BigDecimal.RoundingMode.HALF_UP).toDouble
+  }
+
+  private def generateIndex(namespace: String): String = {
+    elasticSearchConfig.indexPattern.dropWhile(_ == '/') format namespace.toLowerCase
+  }
+}
diff --git a/tools/travis/setupPrereq.sh b/tools/travis/setupPrereq.sh
index f820c28..7832938 100755
--- a/tools/travis/setupPrereq.sh
+++ b/tools/travis/setupPrereq.sh
@@ -31,6 +31,7 @@
 $ANSIBLE_CMD couchdb.yml
 $ANSIBLE_CMD initdb.yml
 $ANSIBLE_CMD wipe.yml
+$ANSIBLE_CMD elasticsearch.yml
 
 $ANSIBLE_CMD properties.yml -e manifest_file="$RUNTIMES_MANIFEST"
 echo "Time taken for ${0##*/} is $SECONDS secs"
diff --git a/tools/travis/setupSystem.sh b/tools/travis/setupSystem.sh
index 0750ddd..9f99cf7 100755
--- a/tools/travis/setupSystem.sh
+++ b/tools/travis/setupSystem.sh
@@ -26,7 +26,7 @@
 
 cd $ROOTDIR/ansible
 
-$ANSIBLE_CMD openwhisk.yml -e manifest_file="$RUNTIMES_MANIFEST"
+$ANSIBLE_CMD openwhisk.yml -e manifest_file="$RUNTIMES_MANIFEST" -e db_activation_backend=ElasticSearch
 $ANSIBLE_CMD apigateway.yml
 $ANSIBLE_CMD routemgmt.yml