blob: 59a2ae521acee627221ee61360eb9e4d97c39563 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
from typing import Any
from typing_extensions import override
from flink_agents.api.metric_group import Counter, Gauge, Histogram, Meter, MetricGroup
class FlinkMetricGroup(MetricGroup):
"""Implementation of MetricGroup for flink execution environment."""
def __init__(self, j_metric_group: Any) -> None:
"""Initialize a flink metric group with the given java metric group.
Parameters
----------
j_metric_group : Any
Java metric group used to synchronize data with the Flink metric system.
"""
self._j_metric_group = j_metric_group
@override
def get_sub_group(self, name: str) -> "FlinkMetricGroup":
return FlinkMetricGroup(self._j_metric_group.getSubGroup(name))
@override
def get_counter(self, name: str) -> "FlinkCounter":
return FlinkCounter(self._j_metric_group.getCounter(name))
@override
def get_meter(self, name: str) -> "FlinkMeter":
return FlinkMeter(self._j_metric_group.getMeter(name))
@override
def get_histogram(self, name: str, window_size: int = 100) -> "FlinkHistogram":
return FlinkHistogram(self._j_metric_group.getHistogram(name, window_size))
@override
def get_gauge(self, name: str) -> "FlinkGauge":
return FlinkGauge(self._j_metric_group.getGauge(name))
class FlinkCounter(Counter):
"""Implementation of Counter for flink execution environment."""
def __init__(self, j_counter: Any) -> None:
"""Initialize a flink runner context with the given java runner context.
Parameters
----------
j_counter : Any
Java counter used for measuring the count of events.
"""
self._j_counter = j_counter
@override
def inc(self, n: int = 1) -> None:
"""Increment the current count by the given value."""
self._j_counter.inc(n)
@override
def dec(self, n: int = 1) -> None:
"""Decrement the current count by the given value."""
self._j_counter.dec(n)
@override
def get_count(self) -> int:
"""Return the current count."""
return self._j_counter.getCount()
class FlinkMeter(Meter):
"""Implementation of Meter for flink execution environment."""
def __init__(self, j_meter: Any) -> None:
"""Initialize a flink meter with the given java meter.
Parameters
----------
j_meter : Any
Java meter measures throughput.
"""
self._j_meter = j_meter
@override
def mark(self, n: int = 1) -> None:
"""Mark the occurrence of n events."""
self._j_meter.markEvent(n)
@override
def get_rate(self) -> float:
"""Return the current event rate per second."""
return self._j_meter.getRate()
class FlinkHistogram(Histogram):
"""Implementation of Histogram for flink execution environment."""
def __init__(self, j_histogram: Any) -> None:
"""Initialize a flink histogram with the given java histogram.
Parameters
----------
j_histogram : Any
Java histogram used for recording values and computing statistical
summaries.
"""
self._j_histogram = j_histogram
self._j_statistics = j_histogram.getStatistics()
@override
def update(self, value: int) -> None:
"""Record a new value into the histogram."""
self._j_histogram.update(value)
@override
def get_mean(self) -> float:
"""Return the average value."""
return self._j_statistics.getMean()
@override
def get_max(self) -> int:
"""Return the maximum recorded value."""
return self._j_statistics.getMax()
@override
def get_min(self) -> int:
"""Return the minimum recorded value."""
return self._j_statistics.getMin()
class FlinkGauge(Gauge):
"""Implementation of Gauge for flink execution environment."""
def __init__(self, j_gauge: Any) -> None:
"""Initialize a flink gauge with the given java gauge.
Parameters
----------
j_gauge : Any
Java gauge for recording a string value.
"""
self._j_gauge = j_gauge
@override
def update(self, value: float) -> None:
"""Update the gauge with the given value."""
self._j_gauge.update(value)
@override
def get_value(self) -> float:
"""Return the current value of the gauge."""
return self._j_gauge.getValue()