| .. ################################################################################ |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ################################################################################ |
| |
| ======= |
| Metrics |
| ======= |
| |
| PyFlink exposes a metric system that allows gathering and exposing metrics to external systems. |
| |
| Registering metrics |
| =================== |
| |
| You can access the metric system from a :flinkdoc:`Python user-defined function <docs/dev/table/functions/python-udfs/>` |
| by calling ``function_context.get_metric_group()`` in the ``open`` method. |
| The ``get_metric_group()`` method returns a ``MetricGroup`` object on which you can create |
| and register new metrics. |
| |
| Metric types |
| ------------ |
| |
| PyFlink supports ``Counters``, ``Gauges``, ``Distribution`` and ``Meters``. |
| |
| Counter |
| ~~~~~~~ |
| |
| A ``Counter`` is used to count something. The current value can be in- or decremented using ``inc()/inc(n: int)`` or ``dec()/dec(n: int)``. |
| You can create and register a ``Counter`` by calling ``counter(name: str)`` on a ``MetricGroup``. |
| |
| .. code-block:: python |
| |
| from pyflink.table.udf import ScalarFunction |
| |
| class MyUDF(ScalarFunction): |
| |
| def __init__(self): |
| self.counter = None |
| |
| def open(self, function_context): |
| self.counter = function_context.get_metric_group().counter("my_counter") |
| |
| def eval(self, i): |
| self.counter.inc(i) |
| return i |
| |
| Gauge |
| ~~~~~ |
| |
| A ``Gauge`` provides a value on demand. You can register a gauge by calling |
| ``gauge(name: str, obj: Callable[[], int])`` on a MetricGroup. The Callable object will be used to |
| report the values. Gauge metrics are restricted to integer-only values. |
| |
| .. code-block:: python |
| |
| from pyflink.table.udf import ScalarFunction |
| |
| class MyUDF(ScalarFunction): |
| |
| def __init__(self): |
| self.length = 0 |
| |
| def open(self, function_context): |
| function_context.get_metric_group().gauge("my_gauge", lambda : self.length) |
| |
| def eval(self, i): |
| self.length = i |
| return i - 1 |
| |
| Distribution |
| ~~~~~~~~~~~~ |
| |
| A metric that reports information (sum, count, min, max and mean) about the distribution of |
| reported values. The value can be updated using ``update(n: int)``. You can register a distribution |
| by calling ``distribution(name: str)`` on a MetricGroup. Distribution metrics are restricted to |
| integer-only distributions. |
| |
| .. code-block:: python |
| |
| from pyflink.table.udf import ScalarFunction |
| |
| class MyUDF(ScalarFunction): |
| |
| def __init__(self): |
| self.distribution = None |
| |
| def open(self, function_context): |
| self.distribution = function_context.get_metric_group().distribution("my_distribution") |
| |
| def eval(self, i): |
| self.distribution.update(i) |
| return i - 1 |
| |
| Meter |
| ~~~~~ |
| |
| A Meter measures an average throughput. An occurrence of an event can be registered with the |
| ``mark_event()`` method. The occurrence of multiple events at the same time can be registered with |
| ``mark_event(n: int)`` method. You can register a meter by calling |
| ``meter(self, name: str, time_span_in_seconds: int = 60)`` on a MetricGroup. |
| The default value of time_span_in_seconds is 60. |
| |
| .. code-block:: python |
| |
| from pyflink.table.udf import ScalarFunction |
| |
| class MyUDF(ScalarFunction): |
| |
| def __init__(self): |
| self.meter = None |
| |
| def open(self, function_context): |
| # an average rate of events per second over 120s, default is 60s. |
| self.meter = function_context.get_metric_group().meter("my_meter", time_span_in_seconds=120) |
| |
| def eval(self, i): |
| self.meter.mark_event(i) |
| return i - 1 |
| |
| Scope |
| ===== |
| |
| You can refer to the Java metric document for more details on :flinkdoc:`Scope definition <docs/ops/metrics/#scope>`. |
| |
| User Scope |
| ---------- |
| |
| You can define a user scope by calling ``MetricGroup.add_group(key: str, value: str = None)``. |
| If ``value`` is not ``None``, creates a new key-value MetricGroup pair. |
| The key group is added to this group's sub-groups, while the value group is added to the key |
| group's sub-groups. In this case, the value group will be returned, and a user variable will be defined. |
| |
| .. code-block:: python |
| |
| function_context \ |
| .get_metric_group() \ |
| .add_group("my_metrics") \ |
| .counter("my_counter") |
| |
| function_context \ |
| .get_metric_group() \ |
| .add_group("my_metrics_key", "my_metrics_value") \ |
| .counter("my_counter") |
| |
| System Scope |
| ------------ |
| |
| You can refer to the Java metric document for more details on :flinkdoc:`System Scope <docs/ops/metrics/#system-scope>`. |
| |
| List of all Variables |
| --------------------- |
| |
| You can refer to the Java metric document for more details on :flinkdoc:`List of all Variables <docs/ops/metrics/#list-of-all-variables>`. |
| |
| User Variables |
| -------------- |
| |
| You can define a user variable by calling ``MetricGroup.addGroup(key: str, value: str = None)`` and |
| specifying the value parameter. |
| |
| **Important:** User variables cannot be used in scope formats. |
| |
| .. code-block:: python |
| |
| function_context \ |
| .get_metric_group() \ |
| .add_group("my_metrics_key", "my_metrics_value") \ |
| .counter("my_counter") |
| |
| Common part between PyFlink and Flink |
| ===================================== |
| |
| You can refer to the Java metric document for more details on the following sections: |
| |
| - :flinkdoc:`Reporter <docs/deployment/metric_reporters/>`. |
| - :flinkdoc:`System metrics <docs/ops/metrics/#system-metrics>`. |
| - :flinkdoc:`Latency tracking <docs/ops/metrics/#latency-tracking>`. |
| - :flinkdoc:`REST API integration <docs/ops/metrics/#rest-api-integration>`. |
| - :flinkdoc:`Dashboard integration <docs/ops/metrics/#dashboard-integration>`. |