blob: 7b5709e2fc3f9d092100351a1539ef6c917a222b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.monitoring.metrics
import java.lang.management.ManagementFactory
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.scaladsl.Consumer
import akka.kafka.scaladsl.Consumer.DrainingControl
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink}
import javax.management.ObjectName
import org.apache.kafka.clients.consumer.ConsumerConfig
import kamon.Kamon
import kamon.metric.MeasurementUnit
import org.apache.kafka.common
import org.apache.kafka.common.MetricName
import org.apache.openwhisk.connector.kafka.{KafkaMetricsProvider, KamonMetricsReporter}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import org.apache.openwhisk.core.connector.{Activation, EventMessage, Metric}
import org.apache.openwhisk.core.entity.ActivationResponse
import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
trait MetricRecorder {
def processActivation(activation: Activation, initiatorNamespace: String, metricConfig: MetricConfig): Unit
def processMetric(metric: Metric, initiatorNamespace: String): Unit
}
case class EventConsumer(settings: ConsumerSettings[String, String],
recorders: Seq[MetricRecorder],
metricConfig: MetricConfig)(implicit system: ActorSystem, materializer: ActorMaterializer)
extends KafkaMetricsProvider {
import EventConsumer._
private implicit val ec: ExecutionContext = system.dispatcher
//Record the rate of events received
private val activationCounter = Kamon.counter("openwhisk.userevents.global.activations").withoutTags()
private val metricCounter = Kamon.counter("openwhisk.userevents.global.metric").withoutTags()
private val statusCounter = Kamon.counter("openwhisk.userevents.global.status")
private val coldStartCounter = Kamon.counter("openwhisk.userevents.global.coldStarts").withoutTags()
private val statusSuccess = statusCounter.withTag("status", ActivationResponse.statusSuccess)
private val statusFailure = statusCounter.withTag("status", "failure")
private val statusApplicationError = statusCounter.withTag("status", ActivationResponse.statusApplicationError)
private val statusDeveloperError = statusCounter.withTag("status", ActivationResponse.statusDeveloperError)
private val statusInternalError = statusCounter.withTag("status", ActivationResponse.statusWhiskError)
private val waitTime =
Kamon.histogram("openwhisk.userevents.global.waitTime", MeasurementUnit.time.milliseconds).withoutTags()
private val initTime =
Kamon.histogram("openwhisk.userevents.global.initTime", MeasurementUnit.time.milliseconds).withoutTags()
private val duration =
Kamon.histogram("openwhisk.userevents.global.duration", MeasurementUnit.time.milliseconds).withoutTags()
private val lagGauge = Kamon.gauge("openwhisk.userevents.consumer.lag").withoutTags()
def shutdown(): Future[Done] = {
lagRecorder.cancel()
control.drainAndShutdown()(system.dispatcher)
}
def isRunning: Boolean = !control.isShutdown.isCompleted
override def metrics(): Future[Map[MetricName, common.Metric]] = control.metrics
//TODO Use RestartSource
private val control: DrainingControl[Done] = Consumer
.committableSource(updatedSettings, Subscriptions.topics(userEventTopic))
.map { msg =>
processEvent(msg.record.value())
msg.committableOffset
}
.batch(max = 20, CommittableOffsetBatch(_))(_.updated(_))
.mapAsync(3)(_.commitScaladsl())
.toMat(Sink.ignore)(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
private val lagRecorder =
system.scheduler.schedule(10.seconds, 10.seconds)(lagGauge.update(consumerLag))
private def processEvent(value: String): Unit = {
EventMessage
.parse(value)
.map { e =>
e.eventType match {
case Activation.typeName => activationCounter.increment()
case Metric.typeName => metricCounter.increment()
}
e
}
.foreach { e =>
e.body match {
case a: Activation =>
recorders.foreach(_.processActivation(a, e.namespace, metricConfig))
updateGlobalMetrics(a)
case m: Metric =>
recorders.foreach(_.processMetric(m, e.namespace))
}
}
}
private def updateGlobalMetrics(a: Activation): Unit = {
a.status match {
case ActivationResponse.statusSuccess => statusSuccess.increment()
case ActivationResponse.statusApplicationError => statusApplicationError.increment()
case ActivationResponse.statusDeveloperError => statusDeveloperError.increment()
case ActivationResponse.statusWhiskError => statusInternalError.increment()
case _ => //Ignore for now
}
if (a.status != ActivationResponse.statusSuccess) statusFailure.increment()
if (a.isColdStart) {
coldStartCounter.increment()
initTime.record(a.initTime.toMillis)
}
waitTime.record(a.waitTime.toMillis)
duration.record(a.duration.toMillis)
}
private def updatedSettings =
settings
.withProperty(ConsumerConfig.CLIENT_ID_CONFIG, id)
.withProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, KamonMetricsReporter.name)
.withStopTimeout(Duration.Zero) // https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#draining-control
}
object EventConsumer {
val userEventTopic = "events"
val id = "event-consumer"
private val server = ManagementFactory.getPlatformMBeanServer
private val name = new ObjectName(s"kafka.consumer:type=consumer-fetch-manager-metrics,client-id=$id")
def consumerLag: Long = server.getAttribute(name, "records-lag-max").asInstanceOf[Double].toLong.max(0)
}