blob: 9cf7a2201d6cc8b106e869456a63cd9d0bd585dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.monitoring.metrics
import java.io.StringWriter
import java.util
import java.util.concurrent.TimeUnit
import akka.event.slf4j.SLF4JLogging
import akka.http.scaladsl.model.{HttpEntity, MessageEntity}
import akka.stream.scaladsl.{Concat, Source}
import akka.util.ByteString
import org.apache.openwhisk.core.connector.{Activation, Metric}
import io.prometheus.client.exporter.common.TextFormat
import io.prometheus.client.{CollectorRegistry, Counter, Gauge, Histogram}
import kamon.prometheus.PrometheusReporter
import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ActivationResponse}
import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.Duration
trait PrometheusMetricNames extends MetricNames {
val namespaceMetric = "openwhisk_namespace_activations_total"
val activationMetric = "openwhisk_action_activations_total"
val coldStartMetric = "openwhisk_action_coldStarts_total"
val waitTimeMetric = "openwhisk_action_waitTime_seconds"
val initTimeMetric = "openwhisk_action_initTime_seconds"
val durationMetric = "openwhisk_action_duration_seconds"
val responseSizeMetric = "openwhisk_action_response_size_bytes"
val statusMetric = "openwhisk_action_status"
val memoryMetric = "openwhisk_action_memory"
val concurrentLimitMetric = "openwhisk_action_limit_concurrent_total"
val timedLimitMetric = "openwhisk_action_limit_timed_total"
}
case class PrometheusRecorder(kamon: PrometheusReporter)
extends MetricRecorder
with PrometheusExporter
with SLF4JLogging {
import PrometheusRecorder._
private val activationMetrics = new TrieMap[String, ActivationPromMetrics]
private val limitMetrics = new TrieMap[String, LimitPromMetrics]
override def processActivation(activation: Activation, initiator: String, metricConfig: MetricConfig): Unit = {
lookup(activation, initiator).record(activation, initiator, metricConfig)
}
override def processMetric(metric: Metric, initiator: String): Unit = {
val limitMetric = limitMetrics.getOrElseUpdate(initiator, LimitPromMetrics(initiator))
limitMetric.record(metric)
}
override def getReport(): MessageEntity =
HttpEntity(PrometheusExporter.textV4, createSource())
private def lookup(activation: Activation, initiator: String): ActivationPromMetrics = {
//TODO Unregister unused actions
val name = activation.name
val kind = activation.kind
val memory = activation.memory.toString
val namespace = activation.namespace
val action = activation.action
activationMetrics.getOrElseUpdate(name, {
ActivationPromMetrics(namespace, action, kind, memory, initiator)
})
}
case class LimitPromMetrics(namespace: String) {
private val concurrentLimit = concurrentLimitCounter.labels(namespace)
private val timedLimit = timedLimitCounter.labels(namespace)
def record(m: Metric): Unit = {
m.metricName match {
case "ConcurrentRateLimit" => concurrentLimit.inc()
case "TimedRateLimit" => timedLimit.inc()
case "ConcurrentInvocations" => //TODO Handle ConcurrentInvocations
case x => log.warn(s"Unknown limit $x")
}
}
}
case class ActivationPromMetrics(namespace: String,
action: String,
kind: String,
memory: String,
initiatorNamespace: String) {
private val namespaceActivations = namespaceActivationCounter.labels(namespace, initiatorNamespace)
private val activations = activationCounter.labels(namespace, initiatorNamespace, action, kind, memory)
private val coldStarts = coldStartCounter.labels(namespace, initiatorNamespace, action)
private val waitTime = waitTimeHisto.labels(namespace, initiatorNamespace, action)
private val initTime = initTimeHisto.labels(namespace, initiatorNamespace, action)
private val duration = durationHisto.labels(namespace, initiatorNamespace, action)
private val responseSize = responseSizeHisto.labels(namespace, initiatorNamespace, action)
private val gauge = memoryGauge.labels(namespace, initiatorNamespace, action)
private val statusSuccess =
statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusSuccess)
private val statusApplicationError =
statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusApplicationError)
private val statusDeveloperError =
statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusDeveloperError)
private val statusInternalError =
statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusWhiskError)
def record(a: Activation, initiator: String, metricConfig: MetricConfig): Unit = {
namespaceActivations.inc()
// only record activation if not executed in an ignored namespace
if (!metricConfig.ignoredNamespaces.contains(a.namespace)) {
recordActivation(a, initiator)
}
}
def recordActivation(a: Activation, initiator: String): Unit = {
gauge.set(a.memory)
activations.inc()
if (a.isColdStart) {
coldStarts.inc()
initTime.observe(seconds(a.initTime))
}
//waitTime may be zero for activations which are part of sequence
waitTime.observe(seconds(a.waitTime))
duration.observe(seconds(a.duration))
a.status match {
case ActivationResponse.statusSuccess => statusSuccess.inc()
case ActivationResponse.statusApplicationError => statusApplicationError.inc()
case ActivationResponse.statusDeveloperError => statusDeveloperError.inc()
case ActivationResponse.statusWhiskError => statusInternalError.inc()
case x => statusCounter.labels(namespace, initiator, action, x).inc()
}
a.size.foreach(responseSize.observe(_))
}
}
//Returns a floating point number
private def seconds(time: Duration): Double = time.toUnit(TimeUnit.SECONDS)
private def createSource() =
Source.combine(createJavaClientSource(), createKamonSource())(Concat(_)).map(ByteString(_))
/**
* Enables streaming the prometheus metric data without building the whole report in memory
*/
private def createJavaClientSource() =
Source
.fromIterator(() => CollectorRegistry.defaultRegistry.metricFamilySamples().asScala)
.map { sample =>
//Stream string representation of one sample at a time
val writer = new StringWriter()
TextFormat.write004(writer, singletonEnumeration(sample))
writer.toString
}
private def createKamonSource() = Source.single(kamon.scrapeData())
private def singletonEnumeration[A](value: A) = new util.Enumeration[A] {
private var done = false
override def hasMoreElements: Boolean = !done
override def nextElement(): A = {
if (done) throw new NoSuchElementException
done = true
value
}
}
}
object PrometheusRecorder extends PrometheusMetricNames {
private val namespaceActivationCounter =
counter(namespaceMetric, "Namespace activations Count", actionNamespace, initiatorNamespace)
private val activationCounter =
counter(
activationMetric,
"Activation Count",
actionNamespace,
initiatorNamespace,
actionName,
actionKind,
actionMemory)
private val coldStartCounter =
counter(coldStartMetric, "Cold start counts", actionNamespace, initiatorNamespace, actionName)
private val statusCounter =
counter(
statusMetric,
"Activation failure status type",
actionNamespace,
initiatorNamespace,
actionName,
actionStatus)
private val waitTimeHisto =
histogram(waitTimeMetric, "Internal system hold time", actionNamespace, initiatorNamespace, actionName)
private val initTimeHisto =
histogram(
initTimeMetric,
"Time it took to initialize an action, e.g. docker init",
actionNamespace,
initiatorNamespace,
actionName)
private val durationHisto =
histogram(
durationMetric,
"Actual time the action code was running",
actionNamespace,
initiatorNamespace,
actionName)
private val responseSizeHisto =
Histogram
.build()
.name(responseSizeMetric)
.help("Activation Response size")
.labelNames(actionNamespace, initiatorNamespace, actionName)
.linearBuckets(0, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toBytes.toDouble, 10)
.register()
private val memoryGauge =
gauge(memoryMetric, "Memory consumption of the action containers", actionNamespace, initiatorNamespace, actionName)
private val concurrentLimitCounter =
counter(concurrentLimitMetric, "a user has exceeded its limit for concurrent invocations", actionNamespace)
private val timedLimitCounter =
counter(
timedLimitMetric,
"the user has reached its per minute limit for the number of invocations",
actionNamespace)
private def counter(name: String, help: String, tags: String*) =
Counter
.build()
.name(name)
.help(help)
.labelNames(tags: _*)
.register()
private def gauge(name: String, help: String, tags: String*) =
Gauge
.build()
.name(name)
.help(help)
.labelNames(tags: _*)
.register()
private def histogram(name: String, help: String, tags: String*) =
Histogram
.build()
.name(name)
.help(help)
.labelNames(tags: _*)
.register()
}