blob: aa1016a892ac92747ef2502bb774c1b10ba7b391 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.metrics;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import java.util.Collection;
import java.util.Map;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/** Tests for {@link MetricsTranslation}. */
@RunWith(Parameterized.class)
public class MetricsTranslationTest {
// Transform names are arbitrary user-meaningful steps in processing
private static final String TRANSFORM1 = "transform1";
private static final String TRANSFORM2 = "transform2";
private static final String TRANSFORM3 = "transform3";
// Namespaces correspond to different contexts for a metric
private static final String NAMESPACE1 = "fakeNamespace1";
private static final String NAMESPACE2 = "fakeNamespace2";
// Names are what is being measured
private static final String COUNTER_NAME1 = "elements";
private static final String COUNTER_NAME2 = "dropped";
private static final String DISTRIBUTION_NAME1 = "someMillis";
private static final String DISTRIBUTION_NAME2 = "otherMillis";
private static final String GAUGE_NAME1 = "load";
private static final String GAUGE_NAME2 = "memory";
private static final BeamFnApi.Metrics.User.MetricName COUNTER_METRIC1 =
BeamFnApi.Metrics.User.MetricName.newBuilder()
.setNamespace(NAMESPACE1)
.setName(COUNTER_NAME1)
.build();
private static final BeamFnApi.Metrics.User.MetricName COUNTER_METRIC2 =
BeamFnApi.Metrics.User.MetricName.newBuilder()
.setNamespace(NAMESPACE1)
.setName(COUNTER_NAME2)
.build();
private static final BeamFnApi.Metrics.User.MetricName DISTRIBUTION_METRIC1 =
BeamFnApi.Metrics.User.MetricName.newBuilder()
.setNamespace(NAMESPACE2)
.setName(DISTRIBUTION_NAME1)
.build();
private static final BeamFnApi.Metrics.User.MetricName DISTRIBUTION_METRIC2 =
BeamFnApi.Metrics.User.MetricName.newBuilder()
.setNamespace(NAMESPACE2)
.setName(DISTRIBUTION_NAME2)
.build();
private static final BeamFnApi.Metrics.User.MetricName GAUGE_METRIC1 =
BeamFnApi.Metrics.User.MetricName.newBuilder()
.setNamespace(NAMESPACE1)
.setName(GAUGE_NAME1)
.build();
private static final BeamFnApi.Metrics.User.MetricName GAUGE_METRIC2 =
BeamFnApi.Metrics.User.MetricName.newBuilder()
.setNamespace(NAMESPACE2)
.setName(GAUGE_NAME2)
.build();
private static final BeamFnApi.Metrics.User DISTRIBUTION1 =
BeamFnApi.Metrics.User.newBuilder()
.setMetricName(DISTRIBUTION_METRIC1)
.setDistributionData(
BeamFnApi.Metrics.User.DistributionData.newBuilder()
.setCount(42)
.setSum(4839L)
.setMax(348L)
.setMin(12L))
.build();
private static final BeamFnApi.Metrics.User DISTRIBUTION2 =
BeamFnApi.Metrics.User.newBuilder()
.setMetricName(DISTRIBUTION_METRIC2)
.setDistributionData(
BeamFnApi.Metrics.User.DistributionData.newBuilder()
.setCount(3)
.setSum(49L)
.setMax(43L)
.setMin(1L))
.build();
private static final BeamFnApi.Metrics.User COUNTER1 =
BeamFnApi.Metrics.User.newBuilder()
.setMetricName(COUNTER_METRIC1)
.setCounterData(BeamFnApi.Metrics.User.CounterData.newBuilder().setValue(92L))
.build();
private static final BeamFnApi.Metrics.User COUNTER2 =
BeamFnApi.Metrics.User.newBuilder()
.setMetricName(COUNTER_METRIC2)
.setCounterData(BeamFnApi.Metrics.User.CounterData.newBuilder().setValue(0L))
.build();
private static final BeamFnApi.Metrics.User GAUGE1 =
BeamFnApi.Metrics.User.newBuilder()
.setMetricName(GAUGE_METRIC2)
.setCounterData(BeamFnApi.Metrics.User.CounterData.newBuilder().setValue(56L))
.build();
private static final BeamFnApi.Metrics.User GAUGE2 =
BeamFnApi.Metrics.User.newBuilder()
.setMetricName(GAUGE_METRIC2)
.setCounterData(BeamFnApi.Metrics.User.CounterData.newBuilder().setValue(3L))
.build();
@Parameterized.Parameters
public static Iterable<Object[]> testInstances() {
return ImmutableList.<Object[]>builder()
.add(
new Object[] {
ImmutableMap.builder().put(TRANSFORM1, ImmutableList.of(DISTRIBUTION1)).build()
})
.add(
new Object[] {
ImmutableMap.builder()
.put(TRANSFORM1, ImmutableList.of(DISTRIBUTION1, COUNTER1))
.build()
})
.add(
new Object[] {
ImmutableMap.builder()
.put(TRANSFORM1, ImmutableList.of(DISTRIBUTION1, COUNTER1))
.put(TRANSFORM2, ImmutableList.of(COUNTER2))
.put(TRANSFORM3, ImmutableList.of(GAUGE1))
.build()
})
.add(
new Object[] {
ImmutableMap.builder().put(TRANSFORM1, ImmutableList.of(GAUGE1, GAUGE2)).build()
})
.build();
}
@Parameterized.Parameter(0)
public Map<String, Collection<BeamFnApi.Metrics.User>> fnMetrics;
@Test
public void testToFromProtoMetricUpdates() {
ImmutableMap.Builder<String, Collection<BeamFnApi.Metrics.User>> result =
ImmutableMap.builder();
for (Map.Entry<String, Collection<BeamFnApi.Metrics.User>> entry : fnMetrics.entrySet()) {
MetricUpdates updates =
MetricsTranslation.metricUpdatesFromProto(entry.getKey(), entry.getValue());
result.putAll(MetricsTranslation.metricUpdatesToProto(updates));
}
Map<String, Collection<BeamFnApi.Metrics.User>> backToProto = result.build();
assertThat(backToProto.keySet(), equalTo(fnMetrics.keySet()));
for (String ptransformName : backToProto.keySet()) {
assertThat(
backToProto.get(ptransformName),
containsInAnyOrder(fnMetrics.get(ptransformName).toArray()));
}
}
}