Cosmosdb query metrics for activation poll query (#4688)

* Update cosmosdb to v2.6.2
* Collect query stats for activation poll/list query
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index f446072..415b9e6 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -83,7 +83,7 @@
 
     compile 'io.reactivex:rxscala_2.12:0.26.5'
     compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
-    compile ('com.microsoft.azure:azure-cosmosdb:2.6.1')
+    compile ('com.microsoft.azure:azure-cosmosdb:2.6.2')
 
     compile ('com.lightbend.akka:akka-stream-alpakka-s3_2.12:1.0.1') {
         exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index 1ff638f..57cb6ea 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -363,14 +363,20 @@
       .map(l => if (limit > 0) l.take(limit) else l)
 
     val g = f.andThen {
-      case Success(out) =>
+      case Success(queryResult) =>
         if (queryMetrics.nonEmpty) {
           val combinedMetrics = QueryMetrics.ZERO.add(queryMetrics: _*)
           logging.debug(
             this,
             s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]")
         }
-        transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}", InfoLevel)
+        val stats = viewMapper.recordQueryStats(ddoc, viewName, descending, querySpec.getParameters, queryResult)
+        val statsToLog = stats.map(s => " " + s).getOrElse("")
+        transid.finished(
+          this,
+          start,
+          s"[QUERY] '$collName' completed: matched ${queryResult.size}$statsToLog",
+          InfoLevel)
     }
     reportFailure(g, start, failure => s"[QUERY] '$collName' internal error, failure: '${failure.getMessage}'")
   }
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
index 6707e343..22aa98e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
@@ -22,6 +22,8 @@
 import com.microsoft.azure.cosmosdb.DataType.{Number, String}
 import com.microsoft.azure.cosmosdb.IndexKind.Range
 import com.microsoft.azure.cosmosdb.{PartitionKeyDefinition, SqlParameter, SqlParameterCollection, SqlQuerySpec}
+import kamon.metric.MeasurementUnit
+import org.apache.openwhisk.common.{LogMarkerToken, TransactionId, WhiskInstants}
 import org.apache.openwhisk.core.database.ActivationHandler.NS_PATH
 import org.apache.openwhisk.core.database.WhisksHandler.ROOT_NS
 import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants.{alias, computed, deleted}
@@ -34,6 +36,10 @@
   WhisksHandler
 }
 import org.apache.openwhisk.core.entity.WhiskQueries.TOP
+import org.apache.openwhisk.utils.JsHelpers
+import spray.json.{JsNumber, JsObject}
+
+import scala.collection.JavaConverters._
 
 private[cosmosdb] trait CosmosDBViewMapper {
   protected val NOTHING = ""
@@ -71,6 +77,17 @@
 
     new SqlQuerySpec(query, paramColl)
   }
+
+  /**
+   *  Records query related stats based on result returned and arguments passed
+   *
+   * @return an optional string representation of stats for logging purpose
+   */
+  def recordQueryStats(ddoc: String,
+                       viewName: String,
+                       descending: Boolean,
+                       queryParams: SqlParameterCollection,
+                       result: List[JsObject]): Option[String] = None
 }
 
 private[cosmosdb] abstract class SimpleMapper extends CosmosDBViewMapper {
@@ -183,7 +200,8 @@
   }
 
 }
-private[cosmosdb] object ActivationViewMapper extends SimpleMapper {
+private[cosmosdb] object ActivationViewMapper extends SimpleMapper with WhiskInstants {
+  import CosmosDBViewMapper._
   private val NS = "namespace"
   private val NS_WITH_PATH = s"$computed.$NS_PATH"
   private val START = "start"
@@ -233,6 +251,41 @@
     case "activations" if ddoc.startsWith("whisks") => s"r.$START"
     case _                                          => throw UnsupportedView(s"$ddoc/$view")
   }
+
+  private val resultDeltaToken = createStatsToken("activations", "resultDelta", "activations")
+  private val sinceDeltaToken = createStatsToken("activations", "sinceDelta", "activations")
+
+  override def recordQueryStats(ddoc: String,
+                                viewName: String,
+                                descending: Boolean,
+                                queryParams: SqlParameterCollection,
+                                result: List[JsObject]): Option[String] = {
+    val stat = if (viewName == "activations" && descending) {
+      // Collect stats for the delta between
+      // 1. now and start time of last activation
+      // 2. now and start time as specific in query for `since` parameter
+      // These stats would help in determining how much old activations are being queried for list query (used in activation poll)
+      val uptoOpt = paramValue(queryParams, "upto", classOf[Number])
+      val startOpt = paramValue(queryParams, "start", classOf[Number])
+
+      // Result json has structure { id: "", "key": [], "value": {activation}}
+      // So fetch value of start via `value.start` path
+      val lastOpt = result.lastOption.flatMap(js => JsHelpers.getFieldPath(js, "value", "start"))
+
+      (uptoOpt, startOpt, lastOpt) match {
+        //Go for case which does not specify upto as that would be the case with poll based query
+        case (None, Some(startFromQuery), Some(JsNumber(start))) =>
+          val now = nowInMillis().toEpochMilli
+          val resultStartDelta = (now - start.longValue()).max(0)
+          val queryStartDelta = (now - startFromQuery.longValue()).max(0)
+          resultDeltaToken.histogram.record(resultStartDelta)
+          sinceDeltaToken.histogram.record(queryStartDelta)
+          Some(s"resultDelta=$resultStartDelta, sinceDelta=$queryStartDelta")
+        case _ => None
+      }
+    } else None
+    stat
+  }
 }
 private[cosmosdb] object SubjectViewMapper extends CosmosDBViewMapper {
   private val UUID = "uuid"
@@ -318,3 +371,18 @@
 
   private def selectClause(count: Boolean) = if (count) "TOP 1 VALUE COUNT(r)" else "r"
 }
+
+object CosmosDBViewMapper {
+
+  def paramValue[T](params: SqlParameterCollection, key: String, clazz: Class[T]): Option[T] = {
+    val name = "@" + key
+    params.iterator().asScala.find(_.getName == name).map(_.getValue(clazz).asInstanceOf[T])
+  }
+
+  def createStatsToken(viewName: String, statName: String, collName: String): LogMarkerToken = {
+    val unit = MeasurementUnit.time.milliseconds
+    val tags = Map("view" -> viewName, "collection" -> collName)
+    if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "query", statName, tags = tags)(unit)
+    else LogMarkerToken("cosmosdb", "query", collName, Some(statName))(unit)
+  }
+}