blob: 7957d52f069173368ca1b80d854caf4f9fc52888 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct;
import static org.apache.beam.sdk.metrics.MetricNameFilter.inNamespace;
import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult;
import static org.apache.beam.sdk.metrics.MetricResultsMatchers.committedMetricsResult;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.GaugeData;
import org.apache.beam.runners.core.metrics.MetricUpdates;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/** Tests for {@link DirectMetrics}. */
@RunWith(JUnit4.class)
public class DirectMetricsTest {
@Mock private CommittedBundle<Object> bundle1;
private static final MetricName NAME1 = MetricName.named("ns1", "name1");
private static final MetricName NAME2 = MetricName.named("ns1", "name2");
private static final MetricName NAME3 = MetricName.named("ns2", "name1");
private static final MetricName NAME4 = MetricName.named("ns2", "name2");
private DirectMetrics metrics;
private ExecutorService executor;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
executor = Executors.newSingleThreadExecutor();
metrics = new DirectMetrics(executor);
}
@After
public void tearDown() {
executor.shutdownNow();
}
@SuppressWarnings("unchecked")
@Test
public void testApplyCommittedNoFilter() {
metrics.commitLogical(
bundle1,
MetricUpdates.create(
ImmutableList.of(
MetricUpdate.create(MetricKey.create("step1", NAME1), 5L),
MetricUpdate.create(MetricKey.create("step1", NAME2), 8L)),
ImmutableList.of(
MetricUpdate.create(
MetricKey.create("step1", NAME1), DistributionData.create(8, 2, 3, 5))),
ImmutableList.of(
MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(15L)))));
metrics.commitLogical(
bundle1,
MetricUpdates.create(
ImmutableList.of(
MetricUpdate.create(MetricKey.create("step2", NAME1), 7L),
MetricUpdate.create(MetricKey.create("step1", NAME2), 4L)),
ImmutableList.of(
MetricUpdate.create(
MetricKey.create("step1", NAME1), DistributionData.create(4, 1, 4, 4))),
ImmutableList.of(
MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(27L)))));
MetricQueryResults results = metrics.allMetrics();
assertThat(
results.getCounters(),
containsInAnyOrder(
attemptedMetricsResult("ns1", "name1", "step1", 0L),
attemptedMetricsResult("ns1", "name2", "step1", 0L),
attemptedMetricsResult("ns1", "name1", "step2", 0L)));
assertThat(
results.getCounters(),
containsInAnyOrder(
committedMetricsResult("ns1", "name1", "step1", 5L),
committedMetricsResult("ns1", "name2", "step1", 12L),
committedMetricsResult("ns1", "name1", "step2", 7L)));
assertThat(
results.getDistributions(),
contains(
attemptedMetricsResult("ns1", "name1", "step1", DistributionResult.IDENTITY_ELEMENT)));
assertThat(
results.getDistributions(),
contains(
committedMetricsResult(
"ns1", "name1", "step1", DistributionResult.create(12, 3, 3, 5))));
assertThat(
results.getGauges(),
contains(attemptedMetricsResult("ns2", "name2", "step1", GaugeResult.empty())));
assertThat(
results.getGauges(),
contains(
committedMetricsResult(
"ns2", "name2", "step1", GaugeResult.create(27L, Instant.now()))));
}
@SuppressWarnings("unchecked")
@Test
public void testApplyAttemptedCountersQueryOneNamespace() {
metrics.updatePhysical(
bundle1,
MetricUpdates.create(
ImmutableList.of(
MetricUpdate.create(MetricKey.create("step1", NAME1), 5L),
MetricUpdate.create(MetricKey.create("step1", NAME3), 8L)),
ImmutableList.of(),
ImmutableList.of()));
metrics.updatePhysical(
bundle1,
MetricUpdates.create(
ImmutableList.of(
MetricUpdate.create(MetricKey.create("step2", NAME1), 7L),
MetricUpdate.create(MetricKey.create("step1", NAME3), 4L)),
ImmutableList.of(),
ImmutableList.of()));
MetricQueryResults results =
metrics.queryMetrics(MetricsFilter.builder().addNameFilter(inNamespace("ns1")).build());
assertThat(
results.getCounters(),
containsInAnyOrder(
attemptedMetricsResult("ns1", "name1", "step1", 5L),
attemptedMetricsResult("ns1", "name1", "step2", 7L)));
assertThat(
results.getCounters(),
containsInAnyOrder(
committedMetricsResult("ns1", "name1", "step1", 0L),
committedMetricsResult("ns1", "name1", "step2", 0L)));
}
@SuppressWarnings("unchecked")
@Test
public void testApplyAttemptedQueryCompositeScope() {
metrics.updatePhysical(
bundle1,
MetricUpdates.create(
ImmutableList.of(
MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L),
MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)),
ImmutableList.of(),
ImmutableList.of()));
metrics.updatePhysical(
bundle1,
MetricUpdates.create(
ImmutableList.of(
MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 12L),
MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)),
ImmutableList.of(),
ImmutableList.of()));
MetricQueryResults results =
metrics.queryMetrics(MetricsFilter.builder().addStep("Outer1").build());
assertThat(
results.getCounters(),
containsInAnyOrder(
attemptedMetricsResult("ns1", "name1", "Outer1/Inner1", 12L),
attemptedMetricsResult("ns1", "name1", "Outer1/Inner2", 8L)));
assertThat(
results.getCounters(),
containsInAnyOrder(
committedMetricsResult("ns1", "name1", "Outer1/Inner1", 0L),
committedMetricsResult("ns1", "name1", "Outer1/Inner2", 0L)));
}
@SuppressWarnings("unchecked")
@Test
public void testPartialScopeMatchingInMetricsQuery() {
metrics.updatePhysical(
bundle1,
MetricUpdates.create(
ImmutableList.of(
MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner1", NAME1), 5L),
MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)),
ImmutableList.of(),
ImmutableList.of()));
metrics.updatePhysical(
bundle1,
MetricUpdates.create(
ImmutableList.of(
MetricUpdate.create(MetricKey.create("Top2/Outer1/Inner1", NAME1), 12L),
MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)),
ImmutableList.of(),
ImmutableList.of()));
MetricQueryResults results =
metrics.queryMetrics(MetricsFilter.builder().addStep("Top1/Outer1").build());
assertThat(
results.getCounters(),
containsInAnyOrder(
attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner1", 5L),
attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L)));
results = metrics.queryMetrics(MetricsFilter.builder().addStep("Inner2").build());
assertThat(
results.getCounters(),
containsInAnyOrder(
attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L),
attemptedMetricsResult("ns1", "name1", "Top1/Outer2/Inner2", 18L)));
}
}