title: Implementing a Custom Metrics Sink

Each Heron container has its own centralized Metrics Manager (MM), which collects metrics from all Heron Instances in the container. You can define how the MM processes metrics by implementing a metrics sink, which specifies how the MM handles incoming MetricsRecord objects.

Java is currently the only supported language for custom metrics sinks. This may change in the future.

Currently supported Sinks

Heron comes equipped out of the box with three metrics sinks that you can apply for a specific topology. The code for these sinks may prove helpful for implementing your own.

SinkHow it works
PrometheusPrometheusSink sends each MetricsRecord object to a specified path in the Prometheus instance.
GraphiteGraphiteSink sends each MetricsRecord object to a Graphite instance according to a Graphite prefix.
ScribeScribeSink sends each MetricsRecord object to a Scribe instance according to a Scribe category and namespace.
Local filesystemFileSink writes each MetricsRecord object to a JSON file at a specified path.

Java Setup

In order to create a custom metrics sink, you need to import the heron-spi library into your project.

Maven

<dependency>
  <groupId>org.apache.heron</groupId>
  <artifactId>heron-spi</artifactId>
  <version>{{% heronVersion %}}</version>
</dependency>

Gradle

dependencies {
  compile group: "org.apache.heron", name: "heron-spi", version: "{{% heronVersion %}}"
}

The IMetricsSink Interface

Each metrics sink must implement the IMetricsSink interface, which requires you to implement the following methods:

MethodDescription
initDefines the initialization behavior of the sink. The conf map is the configuration that is passed to the sink by the .yaml configuration file at heron/config/metrics_sink.yaml; the SinkContext object enables you to access values from the sink's runtime context (the ID of the metrics manager, the ID of the sink, and the name of the topology).
processRecordDefines how each MetricsRecord that passes through the sink is processed.
flushFlush any buffered metrics; this function is called at the interval specified by the flush-frequency-ms parameter. More info can be found in the Stream Manager documentation.
closeCloses the stream and releases any system resources associated with it; if the stream is already closed, invoking close() has no effect.

Your implementation of those interfaces will need to be packaged into a JAR file and distributed to the heron-core/lib/metricsmgr folder of your Heron release.

Example Implementation

Below is an example implementation that simply prints the contents of each metrics record as it passes through:

import org.apache.heron.metricsmgr.api.metrics.MetricsInfo;
import org.apache.heron.metricsmgr.api.metrics.MetricsRecord;
import org.apache.heron.metricsmgr.api.sink.IMetricsSink;
import org.apache.heron.metricsmgr.api.sink.SinkContext;

public class PrintSink implements IMetricsSink {
    @Override
    public void init(Map<String, Object> conf, SinkContext context) {
        System.out.println("Sink configuration:");
        // This will print out each config in the supplied configuration
        for (Map.Entry<String, Object> config : conf.entrySet()) {
            System.out.println(String.format("%s: %s", config.getKey(), config.getValue());
        }
        System.out.println(String.format("Topology name: %s", context.getTopologyName());
        System.out.println(String.format("Sink ID: %s", context.getSinkId()));
    }

    @Override
    public void processRecord(MetricsRecord record) {
        String recordString = String.format("Record received: %s", record.toString());
        System.out.println(recordString);
    }

    @Override
    public void flush() {
        // Since we're just printing to stdout in this sink, we don't need to
        // specify any flush() behavior
    }

    @Override
    public void close() {
        // Since we're just printing to stdout in this sink, we don't need to
        // specify any close() behavior
    }
}

Configuring Your Custom Sink

The configuration for your sink needs to be provided in the metrics_sinks.yaml configuration file in your scheduler's base configuration template.

At the top of that file there‘s a sinks parameter that lists each available sink by name. You should add the sink you want to use to that list. Here’s an example:

sinks:
  - file-sink
  - scribe-sink
  - tmaster-sink
  - print-sink
  - prometheus-sink

For each sink you are required to specify the following:

ParameterDescription
classThe Java class name of your custom implementation of the IMetricsSink interface, e.g. biz.acme.heron.metrics.PrintSink.
flush-frequency-msThe frequency (in milliseconds) at which the flush() method is called in your implementation of IMetricsSink.
sink-restart-attemptsThe number of times that a sink will attempt to restart if it throws exceptions and dies. If you do not set this, the default is 0; if you set it to -1, the sink will attempt to restart forever.

Here is an example metrics_sinks.yaml configuration:

sinks:
  - custom-sink

print-sink:
  class: "biz.acme.heron.metrics.CustomSink"
  flush-frequency-ms: 60000 # One minute
  sink-restart-attempts: -1 # Attempt to restart forever
  some-other-config: false

It is optional to add other configurations for the sink. All configurations will be constructed as an unmodifiable map Map<String, Object> conf and passed to the sink via the init function.

Using Your Custom Sink

Once you've made a JAR for your custom Java sink, distributed that JAR to heron-core/lib/metricsmgr folder, and changed the configuration in metrics_sinks.yaml file in the base configuration template, any topology submitted using that configuration will include the custom sink.

You must re-compile Heron if you want to include the configuration in a new distribution of Heron CLI.