blob: a2cd28666f386cbae82c998d2c5009fd48591f80 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.metrics;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.MetricsApi.CounterData;
import org.apache.beam.model.pipeline.v1.MetricsApi.ExtremaData;
import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metric;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Holds the metrics for a single step and uses metric cells that allow extracting the cumulative
* value. Generally, this implementation should be used for a specific unit of commitment (bundle)
* that wishes to report the values since the start of the bundle (e.g., for committed metrics).
*
* <p>This class is thread-safe. It is intended to be used with 1 (or more) threads are updating
* metrics and at-most 1 thread is extracting updates by calling {@link #getUpdates} and {@link
* #commitUpdates}. Outside of this it is still safe. Although races in the update extraction may
* cause updates that don't actually have any changes, it will never lose an update.
*
* <p>For consistency, all threads that update metrics should finish before getting the final
* cumulative values/updates.
*/
@Experimental(Kind.METRICS)
public class MetricsContainerImpl implements Serializable, MetricsContainer {
private static final Logger LOG = LoggerFactory.getLogger(MetricsContainerImpl.class);
@Nullable private final String stepName;
private MetricsMap<MetricName, CounterCell> counters = new MetricsMap<>(CounterCell::new);
private MetricsMap<MetricName, DistributionCell> distributions =
new MetricsMap<>(DistributionCell::new);
private MetricsMap<MetricName, GaugeCell> gauges = new MetricsMap<>(GaugeCell::new);
/** Create a new {@link MetricsContainerImpl} associated with the given {@code stepName}. */
public MetricsContainerImpl(@Nullable String stepName) {
this.stepName = stepName;
}
/**
* Return a {@code CounterCell} named {@code metricName}. If it doesn't exist, create a {@code
* Metric} with the specified name.
*/
@Override
public CounterCell getCounter(MetricName metricName) {
return counters.get(metricName);
}
/**
* Return a {@code CounterCell} named {@code metricName}. If it doesn't exist, return {@code
* null}.
*/
@Nullable
public CounterCell tryGetCounter(MetricName metricName) {
return counters.tryGet(metricName);
}
/**
* Return a {@code DistributionCell} named {@code metricName}. If it doesn't exist, create a
* {@code Metric} with the specified name.
*/
@Override
public DistributionCell getDistribution(MetricName metricName) {
return distributions.get(metricName);
}
/**
* Return a {@code DistributionCell} named {@code metricName}. If it doesn't exist, return {@code
* null}.
*/
@Nullable
public DistributionCell tryGetDistribution(MetricName metricName) {
return distributions.tryGet(metricName);
}
/**
* Return a {@code GaugeCell} named {@code metricName}. If it doesn't exist, create a {@code
* Metric} with the specified name.
*/
@Override
public GaugeCell getGauge(MetricName metricName) {
return gauges.get(metricName);
}
/**
* Return a {@code GaugeCell} named {@code metricName}. If it doesn't exist, return {@code null}.
*/
@Nullable
public GaugeCell tryGetGauge(MetricName metricName) {
return gauges.tryGet(metricName);
}
private <UpdateT, CellT extends MetricCell<UpdateT>>
ImmutableList<MetricUpdate<UpdateT>> extractUpdates(MetricsMap<MetricName, CellT> cells) {
ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
if (cell.getValue().getDirty().beforeCommit()) {
updates.add(
MetricUpdate.create(
MetricKey.create(stepName, cell.getKey()), cell.getValue().getCumulative()));
}
}
return updates.build();
}
/**
* Return the cumulative values for any metrics that have changed since the last time updates were
* committed.
*/
public MetricUpdates getUpdates() {
return MetricUpdates.create(
extractUpdates(counters), extractUpdates(distributions), extractUpdates(gauges));
}
/** @return The MonitoringInfo generated from the metricUpdate. */
@Nullable
private MonitoringInfo counterUpdateToMonitoringInfo(MetricUpdate<Long> metricUpdate) {
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder(true);
MetricName metricName = metricUpdate.getKey().metricName();
if (metricName instanceof MonitoringInfoMetricName) {
MonitoringInfoMetricName monitoringInfoName = (MonitoringInfoMetricName) metricName;
// Represents a specific MonitoringInfo for a specific URN.
builder.setUrn(monitoringInfoName.getUrn());
for (Entry<String, String> e : monitoringInfoName.getLabels().entrySet()) {
builder.setLabel(e.getKey(), e.getValue());
}
} else { // Represents a user counter.
// Drop if the stepname is not set. All user counters must be
// defined for a PTransform. They must be defined on a container bound to a step.
if (this.stepName == null) {
return null;
}
builder
.setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
.setLabel(
MonitoringInfoConstants.Labels.NAMESPACE,
metricUpdate.getKey().metricName().getNamespace())
.setLabel(
MonitoringInfoConstants.Labels.NAME, metricUpdate.getKey().metricName().getName())
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, metricUpdate.getKey().stepName());
}
builder.setInt64Value(metricUpdate.getUpdate());
builder.setTimestampToNow();
return builder.build();
}
/**
* @param metricUpdate
* @return The MonitoringInfo generated from the metricUpdate.
*/
@Nullable
private MonitoringInfo distributionUpdateToMonitoringInfo(
MetricUpdate<org.apache.beam.runners.core.metrics.DistributionData> metricUpdate) {
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder(true);
MetricName metricName = metricUpdate.getKey().metricName();
if (metricName instanceof MonitoringInfoMetricName) {
MonitoringInfoMetricName monitoringInfoName = (MonitoringInfoMetricName) metricName;
// Represents a specific MonitoringInfo for a specific URN.
builder.setUrn(monitoringInfoName.getUrn());
for (Entry<String, String> e : monitoringInfoName.getLabels().entrySet()) {
builder.setLabel(e.getKey(), e.getValue());
}
} else { // Note: (metricName instanceof MetricName) is always True.
// Represents a user counter.
builder
.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER)
.setLabel(
MonitoringInfoConstants.Labels.NAMESPACE,
metricUpdate.getKey().metricName().getNamespace())
.setLabel(
MonitoringInfoConstants.Labels.NAME, metricUpdate.getKey().metricName().getName());
// Drop if the stepname is not set. All user counters must be
// defined for a PTransform. They must be defined on a container bound to a step.
if (this.stepName == null) {
// TODO(BEAM-7191): Consider logging a warning with a quiet logging API.
return null;
}
builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, metricUpdate.getKey().stepName());
}
builder.setInt64DistributionValue(metricUpdate.getUpdate());
builder.setTimestampToNow();
return builder.build();
}
/** Return the cumulative values for any metrics in this container as MonitoringInfos. */
public Iterable<MonitoringInfo> getMonitoringInfos() {
// Extract user metrics and store as MonitoringInfos.
ArrayList<MonitoringInfo> monitoringInfos = new ArrayList<MonitoringInfo>();
MetricUpdates metricUpdates = this.getUpdates();
for (MetricUpdate<Long> metricUpdate : metricUpdates.counterUpdates()) {
MonitoringInfo mi = counterUpdateToMonitoringInfo(metricUpdate);
if (mi != null) {
monitoringInfos.add(mi);
}
}
for (MetricUpdate<org.apache.beam.runners.core.metrics.DistributionData> metricUpdate :
metricUpdates.distributionUpdates()) {
MonitoringInfo mi = distributionUpdateToMonitoringInfo(metricUpdate);
if (mi != null) {
monitoringInfos.add(mi);
}
}
return monitoringInfos;
}
private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?>> cells) {
for (MetricCell<?> cell : cells.values()) {
cell.getDirty().afterCommit();
}
}
/**
* Mark all of the updates that were retrieved with the latest call to {@link #getUpdates()} as
* committed.
*/
public void commitUpdates() {
commitUpdates(counters);
commitUpdates(distributions);
commitUpdates(gauges);
}
private <UserT extends Metric, UpdateT, CellT extends MetricCell<UpdateT>>
ImmutableList<MetricUpdate<UpdateT>> extractCumulatives(MetricsMap<MetricName, CellT> cells) {
ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
UpdateT update = checkNotNull(cell.getValue().getCumulative());
updates.add(MetricUpdate.create(MetricKey.create(stepName, cell.getKey()), update));
}
return updates.build();
}
/**
* Return the {@link MetricUpdates} representing the cumulative values of all metrics in this
* container.
*/
public MetricUpdates getCumulative() {
return MetricUpdates.create(
extractCumulatives(counters),
extractCumulatives(distributions),
extractCumulatives(gauges));
}
/** Update values of this {@link MetricsContainerImpl} by merging the value of another cell. */
public void update(MetricsContainerImpl other) {
updateCounters(counters, other.counters);
updateDistributions(distributions, other.distributions);
updateGauges(gauges, other.gauges);
}
/** Update values of this {@link MetricsContainerImpl} by reading from {@code monitoringInfos}. */
public void update(Iterable<MonitoringInfo> monitoringInfos) {
monitoringInfos.forEach(
monitoringInfo -> {
if (!monitoringInfo.hasMetric()) {
return;
}
MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo);
MetricsApi.Metric metric = monitoringInfo.getMetric();
if (metric.hasCounterData()) {
CounterData counterData = metric.getCounterData();
if (counterData.getValueCase() == CounterData.ValueCase.INT64_VALUE) {
Counter counter = getCounter(metricName);
counter.inc(counterData.getInt64Value());
} else {
LOG.warn("Unsupported CounterData type: {}", counterData);
}
} else if (metric.hasDistributionData()) {
MetricsApi.DistributionData distributionData = metric.getDistributionData();
if (distributionData.hasIntDistributionData()) {
Distribution distribution = getDistribution(metricName);
IntDistributionData intDistributionData = distributionData.getIntDistributionData();
distribution.update(
intDistributionData.getSum(),
intDistributionData.getCount(),
intDistributionData.getMin(),
intDistributionData.getMax());
} else {
LOG.warn("Unsupported DistributionData type: {}", distributionData);
}
} else if (metric.hasExtremaData()) {
ExtremaData extremaData = metric.getExtremaData();
LOG.warn("Extrema metric unsupported: {}", extremaData);
}
});
}
private void updateCounters(
MetricsMap<MetricName, CounterCell> current, MetricsMap<MetricName, CounterCell> updates) {
for (Map.Entry<MetricName, CounterCell> counter : updates.entries()) {
current.get(counter.getKey()).inc(counter.getValue().getCumulative());
}
}
private void updateDistributions(
MetricsMap<MetricName, DistributionCell> current,
MetricsMap<MetricName, DistributionCell> updates) {
for (Map.Entry<MetricName, DistributionCell> counter : updates.entries()) {
current.get(counter.getKey()).update(counter.getValue().getCumulative());
}
}
private void updateGauges(
MetricsMap<MetricName, GaugeCell> current, MetricsMap<MetricName, GaugeCell> updates) {
for (Map.Entry<MetricName, GaugeCell> counter : updates.entries()) {
current.get(counter.getKey()).update(counter.getValue().getCumulative());
}
}
@Override
public boolean equals(Object object) {
if (object instanceof MetricsContainerImpl) {
MetricsContainerImpl metricsContainerImpl = (MetricsContainerImpl) object;
return Objects.equals(stepName, metricsContainerImpl.stepName)
&& Objects.equals(counters, metricsContainerImpl.counters)
&& Objects.equals(distributions, metricsContainerImpl.distributions)
&& Objects.equals(gauges, metricsContainerImpl.gauges);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(stepName, counters, distributions, gauges);
}
}