* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.samza.metrics.reporter
import org.apache.samza.metrics._
import org.apache.samza.serializers.Serializer
import org.apache.samza.system.OutgoingMessageEnvelope
import org.apache.samza.system.SystemProducer
import org.apache.samza.system.SystemStream
import org.apache.samza.util.Logging
import java.util.HashMap
import java.util.Map
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import org.apache.samza.config.ShellCommandConfig
import scala.collection.JavaConverters._
* MetricsSnapshotReporter is a generic metrics reporter that sends metrics to a stream.
* jobName // my-samza-job
* jobId // an id that differentiates multiple executions of the same job
* taskName // container_567890
* host // eat1-app128.gird
* version // 0.0.1
* blacklist // Regex of metrics to ignore when flushing
class MetricsSnapshotReporter(
producer: SystemProducer,
out: SystemStream,
reportingInterval: Int,
jobName: String,
jobId: String,
containerName: String,
version: String,
samzaVersion: String,
host: String,
serializer: Serializer[MetricsSnapshot] = null,
blacklist: Option[String],
clock: () => Long = () => { System.currentTimeMillis }) extends MetricsReporter with Runnable with Logging {
val execEnvironmentContainerId = Option[String](System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID)).getOrElse("")
val executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("Samza MetricsSnapshotReporter Thread-%d").setDaemon(true).build())
val resetTime = clock()
var registries = List[(String, ReadableMetricsRegistry)]()
var blacklistedMetrics = Set[String]()
info("got metrics snapshot reporter properties [job name: %s, job id: %s, containerName: %s, version: %s, samzaVersion: %s, host: %s, reportingInterval %s]"
format(jobName, jobId, containerName, version, samzaVersion, host, reportingInterval))
def start {
info("Starting producer.")
info("Starting reporter timer.")
executor.scheduleWithFixedDelay(this, 0, reportingInterval, TimeUnit.SECONDS)
def register(source: String, registry: ReadableMetricsRegistry) {
registries ::= (source, registry)
info("Registering %s with producer." format source)
def stop = {
// Scheduling an event with 0 delay to ensure flushing of metrics one last time before shutdown
executor.schedule(this, 0, TimeUnit.SECONDS)
info("Stopping reporter timer.")
// Allow the scheduled task above to finish, and block for termination (for max 60 seconds)
executor.awaitTermination(60, TimeUnit.SECONDS)
info("Stopping producer.")
if (!executor.isTerminated) {
warn("Unable to shutdown reporter timer.")
def run() {
try {
} catch {
case e: Exception =>
// Ignore all exceptions - because subsequent executions of this scheduled task will be suppressed
// by the executor if the current task throws an unhandled exception.
warn("Error while reporting metrics. Will retry in " + reportingInterval + " seconds.", e)
def innerRun(): Unit = {
debug("Begin flushing metrics.")
for ((source, registry) <- registries) {
debug("Flushing metrics for %s." format source)
val metricsMsg = new HashMap[String, Map[String, Object]]
// metrics
registry.getGroups.asScala.foreach(group => {
val groupMsg = new HashMap[String, Object]
registry.getGroup(group).asScala.foreach {
case (name, metric) =>
if (!shouldIgnore(group, name)) {
metric.visit(new MetricsVisitor {
def counter(counter: Counter) = groupMsg.put(name, counter.getCount: java.lang.Long)
def gauge[T](gauge: Gauge[T]) = groupMsg.put(name, gauge.getValue.asInstanceOf[Object])
def timer(timer: Timer) = groupMsg.put(name, timer.getSnapshot().getAverage(): java.lang.Double)
// dont emit empty groups
if (!groupMsg.isEmpty) {
metricsMsg.put(group, groupMsg)
// publish to Kafka only if the metricsMsg carries any metrics
if (!metricsMsg.isEmpty) {
val header = new MetricsHeader(jobName, jobId, containerName, execEnvironmentContainerId, source, version, samzaVersion, host, clock(), resetTime)
val metrics = new Metrics(metricsMsg)
debug("Flushing metrics for %s to %s with header and map: header=%s, map=%s." format(source, out, header.getAsMap, metrics.getAsMap()))
val metricsSnapshot = new MetricsSnapshot(header, metrics)
val maybeSerialized = if (serializer != null) {
} else {
try {
producer.send(source, new OutgoingMessageEnvelope(out, host, null, maybeSerialized))
// Always flush, since we don't want metrics to get batched up.
} catch {
case e: Exception => error("Exception when flushing metrics for source %s " format (source), e)
debug("Finished flushing metrics.")
def shouldIgnore(group: String, metricName: String) = {
var isBlacklisted = blacklist.isDefined
val fullMetricName = group + "." + metricName
if (isBlacklisted && !blacklistedMetrics.contains(fullMetricName)) {
if (fullMetricName.matches(blacklist.get)) {
blacklistedMetrics += fullMetricName
debug("Blacklisted metric %s because it matched blacklist regex: %s" format(fullMetricName, blacklist.get))
} else {
isBlacklisted = false