blob: febba8f95ecece0cf9b124f822c2cb60eba3f056 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.metrics;
import static java.util.Collections.singleton;
import static java.util.stream.Collectors.toList;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.concat;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
/**
* Metrics containers by step.
*
* <p>This class is not thread-safe.
*/
public class MetricsContainerStepMap implements Serializable {
private Map<String, MetricsContainerImpl> metricsContainers;
private MetricsContainerImpl unboundContainer = new MetricsContainerImpl(null);
public MetricsContainerStepMap() {
this.metricsContainers = new ConcurrentHashMap<>();
}
/* Returns the container that is not bound to any step name. */
public MetricsContainerImpl getUnboundContainer() {
return this.unboundContainer;
}
/** Returns the container for the given step name. */
public MetricsContainerImpl getContainer(String stepName) {
if (stepName == null) {
// TODO(BEAM-6538): Disallow this in the future, some tests rely on an empty step name today.
return getUnboundContainer();
}
if (!metricsContainers.containsKey(stepName)) {
metricsContainers.put(stepName, new MetricsContainerImpl(stepName));
}
return metricsContainers.get(stepName);
}
/**
* Update this {@link MetricsContainerStepMap} with all values from given {@link
* MetricsContainerStepMap}.
*/
public void updateAll(MetricsContainerStepMap other) {
for (Map.Entry<String, MetricsContainerImpl> container : other.metricsContainers.entrySet()) {
getContainer(container.getKey()).update(container.getValue());
}
getUnboundContainer().update(other.getUnboundContainer());
}
/**
* Update {@link MetricsContainerImpl} for given step in this map with all values from given
* {@link MetricsContainerImpl}.
*/
public void update(String step, MetricsContainerImpl container) {
getContainer(step).update(container);
}
@Override
public boolean equals(Object object) {
if (object instanceof MetricsContainerStepMap) {
MetricsContainerStepMap metricsContainerStepMap = (MetricsContainerStepMap) object;
return Objects.equals(metricsContainers, metricsContainerStepMap.metricsContainers)
&& Objects.equals(unboundContainer, metricsContainerStepMap.unboundContainer);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(metricsContainers, unboundContainer);
}
/**
* Returns {@link MetricResults} based on given {@link MetricsContainerStepMap} of attempted
* metrics.
*
* <p>This constructor is intended for runners which only support `attempted` metrics. Accessing
* {@link MetricResult#getCommitted()} in the resulting {@link MetricResults} will result in an
* {@link UnsupportedOperationException}.
*/
public static MetricResults asAttemptedOnlyMetricResults(
MetricsContainerStepMap attemptedMetricsContainers) {
return asMetricResults(attemptedMetricsContainers, new MetricsContainerStepMap());
}
/**
* Returns {@link MetricResults} based on given {@link MetricsContainerStepMap
* MetricsContainerStepMaps} of attempted and committed metrics.
*
* <p>This constructor is intended for runners which support both attempted and committed metrics.
*/
public static MetricResults asMetricResults(
MetricsContainerStepMap attemptedMetricsContainers,
MetricsContainerStepMap committedMetricsContainers) {
Map<MetricKey, MetricResult<Long>> counters = new HashMap<>();
Map<MetricKey, MetricResult<DistributionData>> distributions = new HashMap<>();
Map<MetricKey, MetricResult<GaugeData>> gauges = new HashMap<>();
for (MetricsContainerImpl container : attemptedMetricsContainers.getMetricsContainers()) {
MetricUpdates cumulative = container.getCumulative();
mergeAttemptedResults(counters, cumulative.counterUpdates(), (l, r) -> l + r);
mergeAttemptedResults(
distributions, cumulative.distributionUpdates(), DistributionData::combine);
mergeAttemptedResults(gauges, cumulative.gaugeUpdates(), GaugeData::combine);
}
for (MetricsContainerImpl container : committedMetricsContainers.getMetricsContainers()) {
MetricUpdates cumulative = container.getCumulative();
mergeCommittedResults(counters, cumulative.counterUpdates(), (l, r) -> l + r);
mergeCommittedResults(
distributions, cumulative.distributionUpdates(), DistributionData::combine);
mergeCommittedResults(gauges, cumulative.gaugeUpdates(), GaugeData::combine);
}
return new DefaultMetricResults(
counters.values(),
distributions.values().stream()
.map(result -> result.transform(DistributionData::extractResult))
.collect(toList()),
gauges.values().stream()
.map(result -> result.transform(GaugeData::extractResult))
.collect(toList()));
}
/** Return the cumulative values for any metrics in this container as MonitoringInfos. */
public Iterable<MonitoringInfo> getMonitoringInfos() {
// Extract user metrics and store as MonitoringInfos.
ArrayList<MonitoringInfo> monitoringInfos = new ArrayList<>();
for (MetricsContainerImpl container : getMetricsContainers()) {
for (MonitoringInfo mi : container.getMonitoringInfos()) {
monitoringInfos.add(mi);
}
}
return monitoringInfos;
}
@Override
public String toString() {
return asAttemptedOnlyMetricResults(this).toString();
}
private Iterable<MetricsContainerImpl> getMetricsContainers() {
return concat(metricsContainers.values(), singleton(unboundContainer));
}
@SuppressWarnings("ConstantConditions")
private static <T> void mergeAttemptedResults(
Map<MetricKey, MetricResult<T>> metricResultMap,
Iterable<MetricUpdate<T>> updates,
BiFunction<T, T, T> combine) {
for (MetricUpdate<T> metricUpdate : updates) {
MetricKey key = metricUpdate.getKey();
MetricResult<T> current = metricResultMap.get(key);
if (current == null) {
metricResultMap.put(key, MetricResult.attempted(key, metricUpdate.getUpdate()));
} else {
metricResultMap.put(key, current.addAttempted(metricUpdate.getUpdate(), combine));
}
}
}
private static <T> void mergeCommittedResults(
Map<MetricKey, MetricResult<T>> metricResultMap,
Iterable<MetricUpdate<T>> updates,
BiFunction<T, T, T> combine) {
for (MetricUpdate<T> metricUpdate : updates) {
MetricKey key = metricUpdate.getKey();
MetricResult<T> current = metricResultMap.get(key);
if (current == null) {
throw new IllegalStateException(
String.format(
"%s: existing 'attempted' result not found for 'committed' value %s",
key, metricUpdate.getUpdate()));
}
metricResultMap.put(key, current.addCommitted(metricUpdate.getUpdate(), combine));
}
}
}