blob: f371cc5fc3086ce699b5890b47318c40e4399891 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.monitoring.metrics
import akka.event.slf4j.SLF4JLogging
import org.apache.openwhisk.core.connector.{Activation, Metric}
import kamon.Kamon
import kamon.metric.MeasurementUnit
import kamon.tag.TagSet
import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
import scala.collection.concurrent.TrieMap
trait KamonMetricNames extends MetricNames {
val namespaceActivationMetric = "openwhisk.namespace.activations"
val activationMetric = "openwhisk.action.activations"
val coldStartMetric = "openwhisk.action.coldStarts"
val waitTimeMetric = "openwhisk.action.waitTime"
val initTimeMetric = "openwhisk.action.initTime"
val durationMetric = "openwhisk.action.duration"
val responseSizeMetric = "openwhisk.action.responseSize"
val statusMetric = "openwhisk.action.status"
val concurrentLimitMetric = "openwhisk.action.limit.concurrent"
val timedLimitMetric = "openwhisk.action.limit.timed"
}
object KamonRecorder extends MetricRecorder with KamonMetricNames with SLF4JLogging {
private val activationMetrics = new TrieMap[String, ActivationKamonMetrics]
private val limitMetrics = new TrieMap[String, LimitKamonMetrics]
override def processActivation(activation: Activation, initiator: String, metricConfig: MetricConfig): Unit = {
lookup(activation, initiator).record(activation, metricConfig)
}
override def processMetric(metric: Metric, initiator: String): Unit = {
val limitMetric = limitMetrics.getOrElseUpdate(initiator, LimitKamonMetrics(initiator))
limitMetric.record(metric)
}
def lookup(activation: Activation, initiator: String): ActivationKamonMetrics = {
val name = activation.name
val kind = activation.kind
val memory = activation.memory.toString
val namespace = activation.namespace
val action = activation.action
activationMetrics.getOrElseUpdate(name, {
ActivationKamonMetrics(namespace, action, kind, memory, initiator)
})
}
case class LimitKamonMetrics(namespace: String) {
private val concurrentLimit = Kamon.counter(concurrentLimitMetric).withTag(`actionNamespace`, namespace)
private val timedLimit = Kamon.counter(timedLimitMetric).withTag(`actionNamespace`, namespace)
def record(m: Metric): Unit = {
m.metricName match {
case "ConcurrentRateLimit" => concurrentLimit.increment()
case "TimedRateLimit" => timedLimit.increment()
case "ConcurrentInvocations" => //TODO Handle ConcurrentInvocations
case x => log.warn(s"Unknown limit $x")
}
}
}
case class ActivationKamonMetrics(namespace: String,
action: String,
kind: String,
memory: String,
initiator: String) {
private val activationTags =
TagSet.from(
Map(
`actionNamespace` -> namespace,
`initiatorNamespace` -> initiator,
`actionName` -> action,
`actionKind` -> kind,
`actionMemory` -> memory))
private val namespaceActivationsTags =
TagSet.from(Map(`actionNamespace` -> namespace, `initiatorNamespace` -> initiator))
private val tags =
TagSet.from(Map(`actionNamespace` -> namespace, `initiatorNamespace` -> initiator, `actionName` -> action))
private val namespaceActivations = Kamon.counter(namespaceActivationMetric).withTags(namespaceActivationsTags)
private val activations = Kamon.counter(activationMetric).withTags(activationTags)
private val coldStarts = Kamon.counter(coldStartMetric).withTags(tags)
private val waitTime = Kamon.histogram(waitTimeMetric, MeasurementUnit.time.milliseconds).withTags(tags)
private val initTime = Kamon.histogram(initTimeMetric, MeasurementUnit.time.milliseconds).withTags(tags)
private val duration = Kamon.histogram(durationMetric, MeasurementUnit.time.milliseconds).withTags(tags)
private val responseSize = Kamon.histogram(responseSizeMetric, MeasurementUnit.information.bytes).withTags(tags)
def record(a: Activation, metricConfig: MetricConfig): Unit = {
namespaceActivations.increment()
// only record activation if not executed in an ignored namespace
if (!metricConfig.ignoredNamespaces.contains(a.namespace)) {
recordActivation(a)
}
}
def recordActivation(a: Activation): Unit = {
activations.increment()
if (a.isColdStart) {
coldStarts.increment()
initTime.record(a.initTime.toMillis)
}
//waitTime may be zero for activations which are part of sequence
waitTime.record(a.waitTime.toMillis)
duration.record(a.duration.toMillis)
Kamon.counter(statusMetric).withTags(tags.withTag("status", a.status)).increment()
a.size.foreach(responseSize.record(_))
}
}
}