Each Heron container has its own centralized Metrics Manager (MM), which collects metrics from all Heron Instances in the container. You can define how the MM processes metrics by implementing a metrics sink, which specifies how the MM handles incoming MetricsRecord
objects.
Java is currently the only supported language for custom metrics sinks. This may change in the future.
Heron comes equipped out of the box with three metrics sinks that you can apply for a specific topology. The code for these sinks may prove helpful for implementing your own.
Sink | How it works |
---|---|
Prometheus | PrometheusSink sends each MetricsRecord object to a specified path in the Prometheus instance. |
Graphite | GraphiteSink sends each MetricsRecord object to a Graphite instance according to a Graphite prefix. |
Scribe | ScribeSink sends each MetricsRecord object to a Scribe instance according to a Scribe category and namespace. |
Local filesystem | FileSink writes each MetricsRecord object to a JSON file at a specified path. |
In order to create a custom metrics sink, you need to import the heron-spi
library into your project.
<dependency> <groupId>org.apache.heron</groupId> <artifactId>heron-spi</artifactId> <version>{{% heronVersion %}}</version> </dependency>
dependencies { compile group: "org.apache.heron", name: "heron-spi", version: "{{% heronVersion %}}" }
IMetricsSink
InterfaceEach metrics sink must implement the IMetricsSink
interface, which requires you to implement the following methods:
Method | Description |
---|---|
init | Defines the initialization behavior of the sink. The conf map is the configuration that is passed to the sink by the .yaml configuration file at heron/config/metrics_sink.yaml ; the SinkContext object enables you to access values from the sink's runtime context (the ID of the metrics manager, the ID of the sink, and the name of the topology). |
processRecord | Defines how each MetricsRecord that passes through the sink is processed. |
flush | Flush any buffered metrics; this function is called at the interval specified by the flush-frequency-ms parameter. More info can be found in the Stream Manager documentation. |
close | Closes the stream and releases any system resources associated with it; if the stream is already closed, invoking close() has no effect. |
Your implementation of those interfaces will need to be packaged into a JAR file and distributed to the heron-core/lib/metricsmgr
folder of your Heron release.
Below is an example implementation that simply prints the contents of each metrics record as it passes through:
import org.apache.heron.metricsmgr.api.metrics.MetricsInfo; import org.apache.heron.metricsmgr.api.metrics.MetricsRecord; import org.apache.heron.metricsmgr.api.sink.IMetricsSink; import org.apache.heron.metricsmgr.api.sink.SinkContext; public class PrintSink implements IMetricsSink { @Override public void init(Map<String, Object> conf, SinkContext context) { System.out.println("Sink configuration:"); // This will print out each config in the supplied configuration for (Map.Entry<String, Object> config : conf.entrySet()) { System.out.println(String.format("%s: %s", config.getKey(), config.getValue()); } System.out.println(String.format("Topology name: %s", context.getTopologyName()); System.out.println(String.format("Sink ID: %s", context.getSinkId())); } @Override public void processRecord(MetricsRecord record) { String recordString = String.format("Record received: %s", record.toString()); System.out.println(recordString); } @Override public void flush() { // Since we're just printing to stdout in this sink, we don't need to // specify any flush() behavior } @Override public void close() { // Since we're just printing to stdout in this sink, we don't need to // specify any close() behavior } }
The configuration for your sink needs to be provided in the metrics_sinks.yaml
configuration file in your scheduler's base configuration template.
At the top of that file there‘s a sinks
parameter that lists each available sink by name. You should add the sink you want to use to that list. Here’s an example:
sinks: - file-sink - scribe-sink - tmaster-sink - print-sink - prometheus-sink
For each sink you are required to specify the following:
Parameter | Description |
---|---|
class | The Java class name of your custom implementation of the IMetricsSink interface, e.g. biz.acme.heron.metrics.PrintSink . |
flush-frequency-ms | The frequency (in milliseconds) at which the flush() method is called in your implementation of IMetricsSink . |
sink-restart-attempts | The number of times that a sink will attempt to restart if it throws exceptions and dies. If you do not set this, the default is 0; if you set it to -1, the sink will attempt to restart forever. |
Here is an example metrics_sinks.yaml
configuration:
sinks: - custom-sink print-sink: class: "biz.acme.heron.metrics.CustomSink" flush-frequency-ms: 60000 # One minute sink-restart-attempts: -1 # Attempt to restart forever some-other-config: false
It is optional to add other configurations for the sink. All configurations will be constructed as an unmodifiable map Map<String, Object> conf
and passed to the sink via the init
function.
Once you've made a JAR for your custom Java sink, distributed that JAR to heron-core/lib/metricsmgr
folder, and changed the configuration in metrics_sinks.yaml
file in the base configuration template, any topology submitted using that configuration will include the custom sink.
You must re-compile Heron if you want to include the configuration in a new distribution of Heron CLI.