| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.testutils.metrics; |
| |
| import java.util.Collection; |
| import java.util.NoSuchElementException; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import java.util.stream.StreamSupport; |
| import org.apache.beam.sdk.PipelineResult; |
| import org.apache.beam.sdk.metrics.DistributionResult; |
| import org.apache.beam.sdk.metrics.MetricNameFilter; |
| import org.apache.beam.sdk.metrics.MetricQueryResults; |
| import org.apache.beam.sdk.metrics.MetricResult; |
| import org.apache.beam.sdk.metrics.MetricsFilter; |
| import org.apache.beam.sdk.testutils.NamedTestResult; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; |
| import org.joda.time.Duration; |
| import org.slf4j.LoggerFactory; |
| |
| /** Provides methods for querying metrics from {@link PipelineResult} per namespace. */ |
| public class MetricsReader { |
| |
| private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MetricsReader.class); |
| |
| private static final long ERRONEOUS_METRIC_VALUE = -1; |
| |
| private final PipelineResult result; |
| |
| private final String namespace; |
| |
| private final long now; |
| |
| @VisibleForTesting |
| MetricsReader(PipelineResult result, String defaultNamespace, long now) { |
| this.result = result; |
| this.namespace = defaultNamespace; |
| this.now = now; |
| } |
| |
| public MetricsReader(PipelineResult result, String namespace) { |
| this(result, namespace, System.currentTimeMillis()); |
| } |
| |
| public MetricsReader withNamespace(String namespace) { |
| return new MetricsReader(result, namespace, now); |
| } |
| |
| /** |
| * Return the current value for a long counter, or -1 if can't be retrieved. Note this uses only |
| * attempted metrics because some runners don't support committed metrics. |
| */ |
| public long getCounterMetric(String name) { |
| MetricQueryResults metrics = |
| result |
| .metrics() |
| .queryMetrics( |
| MetricsFilter.builder() |
| .addNameFilter(MetricNameFilter.named(namespace, name)) |
| .build()); |
| Iterable<MetricResult<Long>> counters = metrics.getCounters(); |
| |
| checkIfMetricResultIsUnique(name, counters); |
| |
| try { |
| MetricResult<Long> metricResult = counters.iterator().next(); |
| return metricResult.getAttempted(); |
| } catch (NoSuchElementException e) { |
| LOG.error("Failed to get metric {}, from namespace {}", name, namespace); |
| } |
| return ERRONEOUS_METRIC_VALUE; |
| } |
| |
| /** |
| * Return start time metric by counting the difference between "now" and min value from a |
| * distribution metric. |
| */ |
| public long getStartTimeMetric(String name) { |
| Iterable<MetricResult<DistributionResult>> timeDistributions = getDistributions(name); |
| return getLowestMin(timeDistributions); |
| } |
| |
| public Collection<NamedTestResult> readAll( |
| Set<Function<MetricsReader, NamedTestResult>> suppliers) { |
| return suppliers.stream().map(supp -> supp.apply(this)).collect(Collectors.toSet()); |
| } |
| |
| private Long getLowestMin(Iterable<MetricResult<DistributionResult>> distributions) { |
| Optional<Long> lowestMin = |
| StreamSupport.stream(distributions.spliterator(), true) |
| .map(element -> element.getAttempted().getMin()) |
| .filter(this::isCredible) |
| .min(Long::compareTo); |
| |
| return lowestMin.orElse(ERRONEOUS_METRIC_VALUE); |
| } |
| |
| /** |
| * Return end time metric by counting the difference between "now" and MAX value from a |
| * distribution metric. |
| */ |
| public long getEndTimeMetric(String name) { |
| Iterable<MetricResult<DistributionResult>> timeDistributions = getDistributions(name); |
| return getGreatestMax(timeDistributions); |
| } |
| |
| private Long getGreatestMax(Iterable<MetricResult<DistributionResult>> distributions) { |
| Optional<Long> greatestMax = |
| StreamSupport.stream(distributions.spliterator(), true) |
| .map(element -> element.getAttempted().getMax()) |
| .filter(this::isCredible) |
| .max(Long::compareTo); |
| |
| return greatestMax.orElse(ERRONEOUS_METRIC_VALUE); |
| } |
| |
| private Iterable<MetricResult<DistributionResult>> getDistributions(String name) { |
| MetricQueryResults metrics = |
| result |
| .metrics() |
| .queryMetrics( |
| MetricsFilter.builder() |
| .addNameFilter(MetricNameFilter.named(namespace, name)) |
| .build()); |
| return metrics.getDistributions(); |
| } |
| |
| private <T> void checkIfMetricResultIsUnique(String name, Iterable<MetricResult<T>> metricResult) |
| throws IllegalStateException { |
| |
| int resultCount = Iterables.size(metricResult); |
| Preconditions.checkState( |
| resultCount <= 1, |
| "More than one metric result matches name: %s in namespace %s. Metric results count: %s", |
| name, |
| namespace, |
| resultCount); |
| } |
| |
| /** |
| * timestamp metrics are used to monitor time of execution of transforms. If result timestamp |
| * metric is too far from now, consider that metric is erroneous private boolean isCredible(long |
| * value) { |
| */ |
| private boolean isCredible(long value) { |
| return (Math.abs(value - now) <= Duration.standardDays(10000).getMillis()); |
| } |
| } |