Cosmosdb query metrics for activation poll query (#4688)
* Update cosmosdb to v2.6.2
* Collect query stats for activation poll/list query
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index f446072..415b9e6 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -83,7 +83,7 @@
compile 'io.reactivex:rxscala_2.12:0.26.5'
compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
- compile ('com.microsoft.azure:azure-cosmosdb:2.6.1')
+ compile ('com.microsoft.azure:azure-cosmosdb:2.6.2')
compile ('com.lightbend.akka:akka-stream-alpakka-s3_2.12:1.0.1') {
exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index 1ff638f..57cb6ea 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -363,14 +363,20 @@
.map(l => if (limit > 0) l.take(limit) else l)
val g = f.andThen {
- case Success(out) =>
+ case Success(queryResult) =>
if (queryMetrics.nonEmpty) {
val combinedMetrics = QueryMetrics.ZERO.add(queryMetrics: _*)
logging.debug(
this,
s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]")
}
- transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}", InfoLevel)
+ val stats = viewMapper.recordQueryStats(ddoc, viewName, descending, querySpec.getParameters, queryResult)
+ val statsToLog = stats.map(s => " " + s).getOrElse("")
+ transid.finished(
+ this,
+ start,
+ s"[QUERY] '$collName' completed: matched ${queryResult.size}$statsToLog",
+ InfoLevel)
}
reportFailure(g, start, failure => s"[QUERY] '$collName' internal error, failure: '${failure.getMessage}'")
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
index 6707e343..22aa98e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
@@ -22,6 +22,8 @@
import com.microsoft.azure.cosmosdb.DataType.{Number, String}
import com.microsoft.azure.cosmosdb.IndexKind.Range
import com.microsoft.azure.cosmosdb.{PartitionKeyDefinition, SqlParameter, SqlParameterCollection, SqlQuerySpec}
+import kamon.metric.MeasurementUnit
+import org.apache.openwhisk.common.{LogMarkerToken, TransactionId, WhiskInstants}
import org.apache.openwhisk.core.database.ActivationHandler.NS_PATH
import org.apache.openwhisk.core.database.WhisksHandler.ROOT_NS
import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants.{alias, computed, deleted}
@@ -34,6 +36,10 @@
WhisksHandler
}
import org.apache.openwhisk.core.entity.WhiskQueries.TOP
+import org.apache.openwhisk.utils.JsHelpers
+import spray.json.{JsNumber, JsObject}
+
+import scala.collection.JavaConverters._
private[cosmosdb] trait CosmosDBViewMapper {
protected val NOTHING = ""
@@ -71,6 +77,17 @@
new SqlQuerySpec(query, paramColl)
}
+
+ /**
+ * Records query related stats based on result returned and arguments passed
+ *
+ * @return an optional string representation of stats for logging purpose
+ */
+ def recordQueryStats(ddoc: String,
+ viewName: String,
+ descending: Boolean,
+ queryParams: SqlParameterCollection,
+ result: List[JsObject]): Option[String] = None
}
private[cosmosdb] abstract class SimpleMapper extends CosmosDBViewMapper {
@@ -183,7 +200,8 @@
}
}
-private[cosmosdb] object ActivationViewMapper extends SimpleMapper {
+private[cosmosdb] object ActivationViewMapper extends SimpleMapper with WhiskInstants {
+ import CosmosDBViewMapper._
private val NS = "namespace"
private val NS_WITH_PATH = s"$computed.$NS_PATH"
private val START = "start"
@@ -233,6 +251,41 @@
case "activations" if ddoc.startsWith("whisks") => s"r.$START"
case _ => throw UnsupportedView(s"$ddoc/$view")
}
+
+ private val resultDeltaToken = createStatsToken("activations", "resultDelta", "activations")
+ private val sinceDeltaToken = createStatsToken("activations", "sinceDelta", "activations")
+
+ override def recordQueryStats(ddoc: String,
+ viewName: String,
+ descending: Boolean,
+ queryParams: SqlParameterCollection,
+ result: List[JsObject]): Option[String] = {
+ val stat = if (viewName == "activations" && descending) {
+ // Collect stats for the delta between
+ // 1. now and start time of last activation
+ // 2. now and start time as specific in query for `since` parameter
+ // These stats would help in determining how much old activations are being queried for list query (used in activation poll)
+ val uptoOpt = paramValue(queryParams, "upto", classOf[Number])
+ val startOpt = paramValue(queryParams, "start", classOf[Number])
+
+ // Result json has structure { id: "", "key": [], "value": {activation}}
+ // So fetch value of start via `value.start` path
+ val lastOpt = result.lastOption.flatMap(js => JsHelpers.getFieldPath(js, "value", "start"))
+
+ (uptoOpt, startOpt, lastOpt) match {
+ //Go for case which does not specify upto as that would be the case with poll based query
+ case (None, Some(startFromQuery), Some(JsNumber(start))) =>
+ val now = nowInMillis().toEpochMilli
+ val resultStartDelta = (now - start.longValue()).max(0)
+ val queryStartDelta = (now - startFromQuery.longValue()).max(0)
+ resultDeltaToken.histogram.record(resultStartDelta)
+ sinceDeltaToken.histogram.record(queryStartDelta)
+ Some(s"resultDelta=$resultStartDelta, sinceDelta=$queryStartDelta")
+ case _ => None
+ }
+ } else None
+ stat
+ }
}
private[cosmosdb] object SubjectViewMapper extends CosmosDBViewMapper {
private val UUID = "uuid"
@@ -318,3 +371,18 @@
private def selectClause(count: Boolean) = if (count) "TOP 1 VALUE COUNT(r)" else "r"
}
+
+object CosmosDBViewMapper {
+
+ def paramValue[T](params: SqlParameterCollection, key: String, clazz: Class[T]): Option[T] = {
+ val name = "@" + key
+ params.iterator().asScala.find(_.getName == name).map(_.getValue(clazz).asInstanceOf[T])
+ }
+
+ def createStatsToken(viewName: String, statName: String, collName: String): LogMarkerToken = {
+ val unit = MeasurementUnit.time.milliseconds
+ val tags = Map("view" -> viewName, "collection" -> collName)
+ if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "query", statName, tags = tags)(unit)
+ else LogMarkerToken("cosmosdb", "query", collName, Some(statName))(unit)
+ }
+}