title: “指标” weight: 6 type: docs aliases:
Flink exposes a metric system that allows gathering and exposing metrics to external systems.
You can access the metric system from any user function that extends [RichFunction]({{< ref “docs/dev/datastream/user_defined_functions” >}}#rich-functions) by calling getRuntimeContext().getMetricGroup()
. This method returns a MetricGroup
object on which you can create and register new metrics.
Flink supports Counters
, Gauges
, Histograms
and Meters
.
A Counter
is used to count something. The current value can be in- or decremented using inc()/inc(long n)
or dec()/dec(long n)
. You can create and register a Counter
by calling counter(String name)
on a MetricGroup
.
{{< tabs “9612d275-bdda-4322-a01f-ae6da805e917” >}} {{< tab “Java” >}}
public class MyMapper extends RichMapFunction<String, String> { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } }
{{< /tab >}} {{< tab “Scala” >}}
class MyMapper extends RichMapFunction[String,String] { @transient private var counter: Counter = _ override def open(parameters: Configuration): Unit = { counter = getRuntimeContext() .getMetricGroup() .counter("myCounter") } override def map(value: String): String = { counter.inc() value } }
{{< /tab >}} {{< /tabs >}}
Alternatively you can also use your own Counter
implementation:
{{< tabs “e2de1ea4-fad3-4619-b4ba-fe41af1bd25f” >}} {{< tab “Java” >}}
public class MyMapper extends RichMapFunction<String, String> { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCustomCounter", new CustomCounter()); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } }
{{< /tab >}} {{< tab “Scala” >}}
class MyMapper extends RichMapFunction[String,String] { @transient private var counter: Counter = _ override def open(parameters: Configuration): Unit = { counter = getRuntimeContext() .getMetricGroup() .counter("myCustomCounter", new CustomCounter()) } override def map(value: String): String = { counter.inc() value } }
{{< /tab >}} {{< /tabs >}}
A Gauge
provides a value of any type on demand. In order to use a Gauge
you must first create a class that implements the org.apache.flink.metrics.Gauge
interface. There is no restriction for the type of the returned value. You can register a gauge by calling gauge(String name, Gauge gauge)
on a MetricGroup
.
{{< tabs “1457e63d-28c4-4dbd-b742-582fe88706bf” >}} {{< tab “Java” >}}
public class MyMapper extends RichMapFunction<String, String> { private transient int valueToExpose = 0; @Override public void open(Configuration config) { getRuntimeContext() .getMetricGroup() .gauge("MyGauge", new Gauge<Integer>() { @Override public Integer getValue() { return valueToExpose; } }); } @Override public String map(String value) throws Exception { valueToExpose++; return value; } }
{{< /tab >}} {{< tab “Scala” >}}
new class MyMapper extends RichMapFunction[String,String] { @transient private var valueToExpose = 0 override def open(parameters: Configuration): Unit = { getRuntimeContext() .getMetricGroup() .gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) ) } override def map(value: String): String = { valueToExpose += 1 value } }
{{< /tab >}} {{< /tabs >}}
Note that reporters will turn the exposed object into a String
, which means that a meaningful toString()
implementation is required.
A Histogram
measures the distribution of long values. You can register one by calling histogram(String name, Histogram histogram)
on a MetricGroup
.
{{< tabs “f00bd80e-ce30-497c-aa1f-89f3b5f653a0” >}} {{< tab “Java” >}}
public class MyMapper extends RichMapFunction<Long, Long> { private transient Histogram histogram; @Override public void open(Configuration config) { this.histogram = getRuntimeContext() .getMetricGroup() .histogram("myHistogram", new MyHistogram()); } @Override public Long map(Long value) throws Exception { this.histogram.update(value); return value; } }
{{< /tab >}} {{< tab “Scala” >}}
class MyMapper extends RichMapFunction[Long,Long] { @transient private var histogram: Histogram = _ override def open(parameters: Configuration): Unit = { histogram = getRuntimeContext() .getMetricGroup() .histogram("myHistogram", new MyHistogram()) } override def map(value: Long): Long = { histogram.update(value) value } }
{{< /tab >}} {{< /tabs >}}
Flink does not provide a default implementation for Histogram
, but offers a {{< gh_link file=“flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java” name=“Wrapper” >}} that allows usage of Codahale/DropWizard histograms. To use this wrapper add the following dependency in your pom.xml
:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-metrics-dropwizard</artifactId> <version>{{< version >}}</version> </dependency>
You can then register a Codahale/DropWizard histogram like this:
{{< tabs “bb87937e-afd3-40c3-9ef2-95bce0cbaeb7” >}} {{< tab “Java” >}}
public class MyMapper extends RichMapFunction<Long, Long> { private transient Histogram histogram; @Override public void open(Configuration config) { com.codahale.metrics.Histogram dropwizardHistogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500)); this.histogram = getRuntimeContext() .getMetricGroup() .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram)); } @Override public Long map(Long value) throws Exception { this.histogram.update(value); return value; } }
{{< /tab >}} {{< tab “Scala” >}}
class MyMapper extends RichMapFunction[Long, Long] { @transient private var histogram: Histogram = _ override def open(config: Configuration): Unit = { com.codahale.metrics.Histogram dropwizardHistogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500)) histogram = getRuntimeContext() .getMetricGroup() .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram)) } override def map(value: Long): Long = { histogram.update(value) value } }
{{< /tab >}} {{< /tabs >}}
A Meter
measures an average throughput. An occurrence of an event can be registered with the markEvent()
method. Occurrence of multiple events at the same time can be registered with markEvent(long n)
method. You can register a meter by calling meter(String name, Meter meter)
on a MetricGroup
.
{{< tabs “39036212-06d1-4efe-bab3-d821aa11f6fe” >}} {{< tab “Java” >}}
public class MyMapper extends RichMapFunction<Long, Long> { private transient Meter meter; @Override public void open(Configuration config) { this.meter = getRuntimeContext() .getMetricGroup() .meter("myMeter", new MyMeter()); } @Override public Long map(Long value) throws Exception { this.meter.markEvent(); return value; } }
{{< /tab >}} {{< tab “Scala” >}}
class MyMapper extends RichMapFunction[Long,Long] { @transient private var meter: Meter = _ override def open(config: Configuration): Unit = { meter = getRuntimeContext() .getMetricGroup() .meter("myMeter", new MyMeter()) } override def map(value: Long): Long = { meter.markEvent() value } }
{{< /tab >}} {{< /tabs >}}
Flink offers a {{< gh_link file=“flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java” name=“Wrapper” >}} that allows usage of Codahale/DropWizard meters. To use this wrapper add the following dependency in your pom.xml
:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-metrics-dropwizard</artifactId> <version>{{< version >}}</version> </dependency>
You can then register a Codahale/DropWizard meter like this:
{{< tabs “9cc57972-cf86-401e-a394-ee97efd816f2” >}} {{< tab “Java” >}}
public class MyMapper extends RichMapFunction<Long, Long> { private transient Meter meter; @Override public void open(Configuration config) { com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter(); this.meter = getRuntimeContext() .getMetricGroup() .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter)); } @Override public Long map(Long value) throws Exception { this.meter.markEvent(); return value; } }
{{< /tab >}} {{< tab “Scala” >}}
class MyMapper extends RichMapFunction[Long,Long] { @transient private var meter: Meter = _ override def open(config: Configuration): Unit = { val dropwizardMeter: com.codahale.metrics.Meter = new com.codahale.metrics.Meter() meter = getRuntimeContext() .getMetricGroup() .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter)) } override def map(value: Long): Long = { meter.markEvent() value } }
{{< /tab >}} {{< /tabs >}}
Every metric is assigned an identifier and a set of key-value pairs under which the metric will be reported.
The identifier is based on 3 components: a user-defined name when registering the metric, an optional user-defined scope and a system-provided scope. For example, if A.B
is the system scope, C.D
the user scope and E
the name, then the identifier for the metric will be A.B.C.D.E
.
You can configure which delimiter to use for the identifier (default: .
) by setting the metrics.scope.delimiter
key in conf/flink-conf.yaml
.
You can define a user scope by calling MetricGroup#addGroup(String name)
, MetricGroup#addGroup(int name)
or MetricGroup#addGroup(String key, String value)
. These methods affect what MetricGroup#getMetricIdentifier
and MetricGroup#getScopeComponents
return.
{{< tabs “8ba6943e-ab5d-45ce-8a73-091a01370eaf” >}} {{< tab “Java” >}}
counter = getRuntimeContext() .getMetricGroup() .addGroup("MyMetrics") .counter("myCounter"); counter = getRuntimeContext() .getMetricGroup() .addGroup("MyMetricsKey", "MyMetricsValue") .counter("myCounter");
{{< /tab >}} {{< tab “Scala” >}}
counter = getRuntimeContext() .getMetricGroup() .addGroup("MyMetrics") .counter("myCounter") counter = getRuntimeContext() .getMetricGroup() .addGroup("MyMetricsKey", "MyMetricsValue") .counter("myCounter")
{{< /tab >}} {{< /tabs >}}
The system scope contains context information about the metric, for example in which task it was registered or what job that task belongs to.
Which context information should be included can be configured by setting the following keys in conf/flink-conf.yaml
. Each of these keys expect a format string that may contain constants (e.g. “taskmanager”) and variables (e.g. “<task_id>”) which will be replaced at runtime.
metrics.scope.jm
metrics.scope.jm.job
metrics.scope.tm
metrics.scope.tm.job
metrics.scope.task
metrics.scope.operator
There are no restrictions on the number or order of variables. Variables are case sensitive.
The default scope for operator metrics will result in an identifier akin to localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric
If you also want to include the task name but omit the task manager information you can specify the following format:
metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>
This could create the identifier localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric
.
Note that for this format string an identifier clash can occur should the same job be run multiple times concurrently, which can lead to inconsistent metric data. As such it is advised to either use format strings that provide a certain degree of uniqueness by including IDs (e.g <job_id>) or by assigning unique names to jobs and operators.
Important: For the Batch API, <operator_id> is always equal to <task_id>.
You can define a user variable by calling MetricGroup#addGroup(String key, String value)
. This method affects what MetricGroup#getMetricIdentifier
, MetricGroup#getScopeComponents
and MetricGroup#getAllVariables()
returns.
Important: User variables cannot be used in scope formats.
{{< tabs “66c0ba7f-adc3-4a8b-831f-b0126ea2de81” >}} {{< tab “Java” >}}
counter = getRuntimeContext() .getMetricGroup() .addGroup("MyMetricsKey", "MyMetricsValue") .counter("myCounter");
{{< /tab >}} {{< tab “Scala” >}}
counter = getRuntimeContext() .getMetricGroup() .addGroup("MyMetricsKey", "MyMetricsValue") .counter("myCounter")
{{< /tab >}} {{< /tabs >}}
For information on how to set up Flink's metric reporters please take a look at the [metric reporters documentation]({{< ref “docs/deployment/metric_reporters” >}}).
By default Flink gathers several metrics that provide deep insights on the current state. This section is a reference of all these metrics.
The tables below generally feature 5 columns:
The “Scope” column describes which scope format is used to generate the system scope. For example, if the cell contains “Operator” then the scope format for “metrics.scope.operator” is used. If the cell contains multiple values, separated by a slash, then the metrics are reported multiple times for different entities, like for both job- and taskmanagers.
The (optional)“Infix” column describes which infix is appended to the system scope.
The “Metrics” column lists the names of all metrics that are registered for the given scope and infix.
The “Description” column provides information as to what a given metric is measuring.
The “Type” column describes which metric type is used for the measurement.
Note that all dots in the infix/metric name columns are still subject to the “metrics.delimiter” setting.
Thus, in order to infer the metric identifier:
The memory-related metrics require Oracle‘s memory management (also included in OpenJDK’s Hotspot implementation) to be in place. Some metrics might not be exposed when using other JVM implementations (e.g. IBM's J9).
{{< hint warning >}} Deprecated: use Default shuffle service metrics {{< /hint >}}
Metrics related to data exchange between task executors using netty network communication.
{{< hint warning >}} If [Reactive Mode]({{< ref “docs/deployment/elastic_scaling” >}}#reactive-mode) is enabled then these metrics, except numRestarts
, do not work correctly. {{< /hint >}}
{
{{< hint warning >}} If [Reactive Mode]({{< ref “docs/deployment/elastic_scaling” >}}#reactive-mode) is enabled then the checkpointing metrics with the Job
scope do not work correctly. {{< /hint >}}
Note that for failed checkpoints, metrics are updated on a best efforts basis and may be not accurate.
Certain RocksDB native metrics are available but disabled by default, you can find full documentation [here]({{< ref “docs/deployment/config” >}}#rocksdb-native-metrics)
System resources reporting is disabled by default. When metrics.system-resource
is enabled additional metrics listed below will be available on Job- and TaskManager. System resources metrics are updated periodically and they present average values for a configured interval (metrics.system-resource-probing-interval
).
System resources reporting requires an optional dependency to be present on the classpath (for example placed in Flink's lib
directory):
com.github.oshi:oshi-core:3.4.0
(licensed under EPL 1.0 license)Including it's transitive dependencies:
net.java.dev.jna:jna-platform:jar:4.2.2
net.java.dev.jna:jna:jar:4.2.2
Failures in this regard will be reported as warning messages like NoClassDefFoundError
logged by SystemResourcesMetricsInitializer
during the startup.
Flink allows to track the latency of records travelling through the system. This feature is disabled by default. To enable the latency tracking you must set the latencyTrackingInterval
to a positive number in either the [Flink configuration]({{< ref “docs/deployment/config” >}}#metrics-latency-interval) or ExecutionConfig
.
At the latencyTrackingInterval
, the sources will periodically emit a special record, called a LatencyMarker
. The marker contains a timestamp from the time when the record has been emitted at the sources. Latency markers can not overtake regular user records, thus if records are queuing up in front of an operator, it will add to the latency tracked by the marker.
Note that the latency markers are not accounting for the time user records spend in operators as they are bypassing them. In particular the markers are not accounting for the time records spend for example in window buffers. Only if operators are not able to accept new records, thus they are queuing up, the latency measured using the markers will reflect that.
The LatencyMarker
s are used to derive a distribution of the latency between the sources of the topology and each downstream operator. These distributions are reported as histogram metrics. The granularity of these distributions can be controlled in the [Flink configuration]({{< ref “docs/deployment/config” >}}#metrics-latency-interval). For the highest granularity subtask
Flink will derive the latency distribution between every source subtask and every downstream subtask, which results in quadratic (in the terms of the parallelism) number of histograms.
Currently, Flink assumes that the clocks of all machines in the cluster are in sync. We recommend setting up an automated clock synchronisation service (like NTP) to avoid false latency results.
Warning Enabling latency metrics can significantly impact the performance of the cluster (in particular for subtask
granularity). It is highly recommended to only use them for debugging purposes.
Flink also allows to track the keyed state access latency for standard Flink state-backends or customized state backends which extending from AbstractStateBackend
. This feature is disabled by default. To enable this feature you must set the state.backend.latency-track.keyed-state-enabled
to true in the [Flink configuration]({{< ref “docs/deployment/config” >}}#state-backends-latency-tracking-options).
Once tracking keyed state access latency is enabled, Flink will sample the state access latency every N
access, in which N
is defined by state.backend.latency-track.sample-interval
. This configuration has a default value of 100. A smaller value will get more accurate results but have a higher performance impact since it is sampled more frequently.
As the type of this latency metrics is histogram, state.backend.latency-track.history-size
will control the maximum number of recorded values in history, which has the default value of 128. A larger value of this configuration will require more memory, but will provide a more accurate result.
Metrics can be queried through the [Monitoring REST API]({{< ref “docs/ops/rest_api” >}}).
Below is a list of available endpoints, with a sample JSON response. All endpoints are of the sample form http://hostname:8081/jobmanager/metrics
, below we list only the path part of the URLs.
Values in angle brackets are variables, for example http://hostname:8081/jobs/<jobid>/metrics
will have to be requested for example as http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/metrics
.
Request metrics for a specific entity:
/jobmanager/metrics
/taskmanagers/<taskmanagerid>/metrics
/jobs/<jobid>/metrics
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>
Request metrics aggregated across all entities of the respective type:
/taskmanagers/metrics
/jobs/metrics
/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics
Request metrics aggregated over a subset of all entities of the respective type:
/taskmanagers/metrics?taskmanagers=A,B,C
/jobs/metrics?jobs=D,E,F
/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3
Warning Metric names can contain special characters that you need to be escape when querying metrics. For example, “a_+_b
” would be escaped to “a_%2B_b
”.
List of characters that should be escaped:
Request a list of available metrics:
GET /jobmanager/metrics
[ { "id": "metric1" }, { "id": "metric2" } ]
Request the values for specific (unaggregated) metrics:
GET taskmanagers/ABCDE/metrics?get=metric1,metric2
[ { "id": "metric1", "value": "34" }, { "id": "metric2", "value": "2" } ]
Request aggregated values for specific metrics:
GET /taskmanagers/metrics?get=metric1,metric2
[ { "id": "metric1", "min": 1, "max": 34, "avg": 15, "sum": 45 }, { "id": "metric2", "min": 2, "max": 14, "avg": 7, "sum": 16 } ]
Request specific aggregated values for specific metrics:
GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max
[ { "id": "metric1", "min": 1, "max": 34 }, { "id": "metric2", "min": 2, "max": 14 } ]
Metrics that were gathered for each task or operator can also be visualized in the Dashboard. On the main page for a job, select the Metrics
tab. After selecting one of the tasks in the top graph you can select metrics to display using the Add Metric
drop-down menu.
<subtask_index>.<metric_name>
.<subtask_index>.<operator_name>.<metric_name>
.Each metric will be visualized as a separate graph, with the x-axis representing time and the y-axis the measured value. All graphs are automatically updated every 10 seconds, and continue to do so when navigating to another page.
There is no limit as to the number of visualized metrics; however only numeric metrics can be visualized.
{{< top >}}