Each Heron topology has its own centralized Metrics Manager (MM), which collects metrics from all instances in the topology. You can define how the MM processes metrics by implementing a metrics sink, which specifies how the MM handles incoming MetricsRecord
objects.
Java is currently the only supported language for custom metrics sinks. This may change in the future.
Heron comes equipped out of the box with three metrics sinks that you can apply for a specific topology. The code for these sinks may prove helpful for implementing your own.
GraphiteSink
--- Sends each MetricsRecord
object to a Graphite instance according to a Graphite prefix.ScribeSink
--- Sends each MetricsRecord
object to a Scribe instance according to a Scribe category and namespace.FileSink
--- Writes each MetricsRecord
object to a JSON file at a specified path.More on using those sinks in a Heron cluster can be found in Metrics Manager.
In order to create a custom metrics sink, you need to import the metricsmgr-api
library into your project.
<dependency> <groupId>com.twitter.heron</groupId> <artifactId>metricsmgr-api</artifactId> <version>{{.Site.Params.versions.metricsapi}}</version> </dependency>
dependencies { compile group: "com.twitter.heron", name: "metricsmgr-api", version: "{{.Site.Params.versions.metricsapi}}" }
IMetricsSink
InterfaceEach metrics sink must implement the IMetricsSink
interface, which requires you to implement the following methods:
void init(Map<String, Object> conf, SinkContext context)
--- Defines the initialization behavior of the sink. The conf
map is the configuration that is passed to the sink by the .yaml
configuration file at heron/config/metrics_sink.yaml
; the SinkContext
object enables you to access values from the sink's runtime context (the ID of the metrics manager, the ID of the sink, and the name of the topology).void processRecord(MetricsRecord record)
--- Defines how each MetricsRecord
that passes through the sink is processed.void flush()
--- Flush any buffered metrics; this function is called at the interval specified by the flush-frequency-ms
. More info can be found in the Stream Manager document.void close()
--- Closes the stream and releases any system resources associated with it; if the stream is already closed, invoking close()
has no effect.Your implementation of those interfaces will need to be packaged into a JAR file and distributed to the metrics-mgr-classpath
folder of your Heron release.
Below is an example implementation that simply prints the contents of each metrics record as it passes through:
import com.twitter.heron.metricsmgr.api.metrics.MetricsInfo; import com.twitter.heron.metricsmgr.api.metrics.MetricsRecord; import com.twitter.heron.metricsmgr.api.sink.IMetricsSink; import com.twitter.heron.metricsmgr.api.sink.SinkContext; public class PrintSink implements IMetricsSink { @Override public void init(Map<String, Object> conf, SinkContext context) { System.out.println("Sink configuration:"); // This will print out each config in the supplied configuration for (Map.Entry<String, Object> config : conf.entrySet()) { System.out.println(String.format("%s: %s", config.getKey(), config.getValue()); } System.out.println(String.format("Topology name: %s", context.getTopologyName()); System.out.println(String.format("Sink ID: %s", context.getSinkId())); } @Override public void processRecord(MetricsRecord record) { String recordString = String.format("Record received: %s", record.toString()); System.out.println(recordString); } @Override public void flush() { // Since we're just printing to stdout in this sink, we don't need to // specify any flush() behavior } @Override public void close() { // Since we're just printing to stdout in this sink, we don't need to // specify any close() behavior } }
The configuration for your sink needs to be provided in the YAML file at heron/config/metrics_sinks.yaml
.
At the top of that file there‘s a sinks
parameter that lists each available sink by name. You should add your sink to that list. Here’s an example:
sinks: - file-sink - scribe-sink - tmaster-sink - print-sink
For each sink you need to specify the following:
class
--- The Java class name of your custom implementation of the IMetricsSink
interface, e.g. biz.acme.heron.metrics.PrintSink
.flush-frequency-ms
--- The frequency (in milliseconds) at which the flush()
method is called in your implementation of IMetricsSink
.sink-restart-attempts
--- The number of times that a sink will attempt to restart if it throws exceptions and dies. If you do not set this, the default is 0; if you set it to -1, the sink will attempt to restart forever.Below is an example metrics_sink.yaml
configuration:
sinks: - print-sink print-sink: class: "biz.acme.heron.metrics.PrintSink" flush-frequency-ms: 60000 # One minute sink-restart-attempts: -1 # Attempt to restart forever
Once you‘ve made a JAR for your custom Java sink, distributed that JAR to metrics-mgr-classpath
folder, and changed the configuration in heron/config/metrics_sink.yaml
, you’ll need to re-compile Heron. Any topology submitted using that compiled version of heron-cli
will include the custom sink.