| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.dataflow.worker; |
| |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.notNullValue; |
| import static org.hamcrest.Matchers.nullValue; |
| import static org.junit.Assert.assertThat; |
| |
| import java.io.IOException; |
| import java.util.List; |
| import javax.annotation.Nullable; |
| import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; |
| import org.apache.beam.runners.dataflow.worker.counters.Counter; |
| import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDistribution; |
| import org.apache.beam.runners.dataflow.worker.counters.CounterName; |
| import org.apache.beam.runners.dataflow.worker.counters.CounterSet; |
| import org.apache.beam.runners.dataflow.worker.counters.NameContext; |
| import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| /** Tests for {@link DataflowElementExecutionTracker}. */ |
| @RunWith(JUnit4.class) |
| public class DataflowElementExecutionTrackerTest { |
| @Rule public final ExpectedException thrown = ExpectedException.none(); |
| |
| private CounterSet counters; |
| private DataflowPipelineDebugOptions options; |
| private ElementExecutionTracker tracker; |
| |
| @Before |
| public void setUp() { |
| counters = new CounterSet(); |
| options = PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class); |
| options.setExperiments( |
| Lists.newArrayList(DataflowElementExecutionTracker.TIME_PER_ELEMENT_EXPERIMENT)); |
| tracker = DataflowElementExecutionTracker.create(counters, options); |
| } |
| |
| /** Typical usage scenario. */ |
| @Test |
| public void testTypicalUsage() throws IOException { |
| NameContext stepA = createStep("A"); |
| NameContext stepB = createStep("B"); |
| NameContext stepC = createStep("C"); |
| NameContext stepD = createStep("D"); |
| |
| // Comments track journal of executions for next sample, and partial timings not yet reported |
| tracker.enter(stepA); // IDLE A1 | {} |
| tracker.enter(stepB); // IDLE A1 B1 | {} |
| tracker.exit(); // IDLE A1 B1 A1 | {} |
| |
| tracker.takeSample(40); // A1 | {A1:2} |
| assertThat(getCounterValue(stepB), equalTo(distribution(10))); |
| |
| tracker.enter(stepB); // A1 B2 | {A1:2} |
| tracker.exit(); // A1 B2 A1 | {A1:2} |
| tracker.enter(stepC); // A1 B2 A1 C1 | {A1:2} |
| tracker.enter(stepD); // A1 B2 A1 C1 D1 | {A1:2} |
| |
| tracker.takeSample(50); // D1 | {A1:4 C1:1 D1:1} |
| assertThat(getCounterValue(stepB), equalTo(distribution(10, 10))); |
| |
| tracker.exit(); // D1 C1 | {A1:4 C1:1 D1:1} |
| tracker.exit(); // D1 C1 A1 | {A1:4 C1:1 D1:1} |
| tracker.enter(stepC); // D1 C1 A1 C2 | {A1:4 C1:1 D1:1} |
| |
| tracker.takeSample(40); // C2 | {A1:5 C2:1} |
| assertThat(getCounterValue(stepC), equalTo(distribution(20))); |
| assertThat(getCounterValue(stepD), equalTo(distribution(20))); |
| |
| tracker.exit(); // C2 A1 | {A1:5 C2:1} |
| tracker.exit(); // C2 A1 IDLE | {A1:5 C2:1} |
| |
| tracker.takeSample(30); // done |
| assertThat(getCounterValue(stepA), equalTo(distribution(60))); |
| assertThat(getCounterValue(stepB), equalTo(distribution(10, 10))); |
| assertThat(getCounterValue(stepC), equalTo(distribution(20, 20))); |
| assertThat(getCounterValue(stepD), equalTo(distribution(20))); |
| } |
| |
| /** Test that counter values are reported when a processing operation finishes. */ |
| @Test |
| public void testCounterReportedOnClose() throws IOException { |
| NameContext step = createStep("A"); |
| tracker.enter(step); |
| tracker.takeSample(10); // half of time attributed to initial IDLE execution |
| assertThat(getCounter(step), nullValue()); |
| |
| tracker.exit(); |
| tracker.takeSample(10); // half of time attributed to final IDLE execution |
| assertThat(getCounterValue(step), equalTo(distribution(10))); |
| } |
| |
| /** Ensure functionality is correctly disabled when the experiment is not set. */ |
| @Test |
| public void testDisabledByExperiment() throws IOException { |
| List<String> experiments = options.getExperiments(); |
| experiments.remove(DataflowElementExecutionTracker.TIME_PER_ELEMENT_EXPERIMENT); |
| options.setExperiments(experiments); |
| tracker = DataflowElementExecutionTracker.create(counters, options); |
| |
| NameContext step = createStep("A"); |
| tracker.enter(step); |
| tracker.exit(); |
| tracker.takeSample(10); |
| assertThat(getCounter(step), nullValue()); |
| } |
| |
| /** |
| * Test that the sampling time is distributed evenly between all execution fragments since last |
| * sampling. |
| */ |
| @Test |
| public void testSampledTimeDistributedBetweenExecutionFragments() throws IOException { |
| NameContext stepA = createStep("A"); |
| NameContext stepB = createStep("B"); |
| |
| tracker.enter(stepA); |
| tracker.exit(); |
| tracker.enter(stepB); |
| tracker.exit(); |
| // Expected journal: IDLE A1 IDLE B1 IDLE |
| |
| tracker.takeSample(50); |
| assertThat(getCounterValue(stepA), equalTo(distribution(10))); |
| assertThat(getCounterValue(stepB), equalTo(distribution(10))); |
| } |
| |
| /** Verify that each entry into a step tracks a separate element execution. */ |
| @Test |
| public void testElementsTrackedIndividuallyForAStep() throws IOException { |
| NameContext stepA = createStep("A"); |
| NameContext stepB = createStep("B"); |
| |
| tracker.enter(stepA); |
| tracker.enter(stepB); |
| tracker.exit(); |
| tracker.enter(stepB); |
| tracker.exit(); |
| tracker.exit(); |
| // Expected journal: IDLE A1 B1 A1 B2 A1 IDLE |
| |
| tracker.takeSample(70); |
| assertThat(getCounterValue(stepA), equalTo(distribution(30))); |
| assertThat(getCounterValue(stepB), equalTo(distribution(10, 10))); |
| } |
| |
| /** |
| * Test that the currently processing element at time of sampling is also counted in the next |
| * sampling period. |
| */ |
| @Test |
| public void testCurrentOperationCountedInNextSample() throws IOException { |
| NameContext step = createStep("A"); |
| tracker.enter(step); |
| tracker.takeSample(20); // Journal: IDLE A1 |
| tracker.takeSample(10); // Journal: A1 |
| tracker.exit(); |
| tracker.takeSample(20); // Journal: A1 IDLE |
| |
| assertThat(getCounterValue(step), equalTo(distribution(30))); |
| } |
| |
| /** |
| * Ensure that sampling is properly handled when there are no new executions since the last |
| * sampling period. |
| */ |
| @Test |
| public void testNoExecutionsSinceLastSample() throws IOException { |
| tracker.takeSample(10); // Journal: IDLE |
| |
| NameContext step = createStep("A"); |
| tracker.enter(step); |
| tracker.takeSample(10); // Journal: IDLE A1 |
| tracker.takeSample(10); // Journal: A1 |
| |
| tracker.exit(); |
| tracker.takeSample(10); // Journal: A1 IDLE |
| assertThat(getCounterValue(step), equalTo(distribution(20))); |
| |
| tracker.takeSample(10); // Journal: IDLE |
| assertThat(getCounterValue(step), equalTo(distribution(20))); |
| } |
| |
| @Test |
| public void testThrowsOnExitBeforeEnter() { |
| thrown.expect(IllegalStateException.class); |
| tracker.exit(); |
| } |
| |
| private NameContext createStep(String stepName) { |
| return NameContext.create("anyStage", stepName, "anySystem", "AnyUserName"); |
| } |
| |
| /** Retrieve the per-element-processing-time counter aggregate for the given step. */ |
| private CounterDistribution getCounterValue(NameContext step) { |
| Counter<Long, CounterDistribution> counter = getCounter(step); |
| assertThat( |
| "per-element-processing-time counter should not be null for step: " + step, |
| counter, |
| notNullValue()); |
| return counter.getAggregate(); |
| } |
| |
| /** |
| * Retrieve the per-element-processing-time counter for the given step, or null if the counter has |
| * not been written. |
| */ |
| @Nullable |
| private Counter<Long, CounterDistribution> getCounter(NameContext step) { |
| CounterName counterName = |
| CounterName.named("per-element-processing-time").withOriginalName(step); |
| return (Counter<Long, CounterDistribution>) counters.getExistingCounter(counterName); |
| } |
| |
| /** Build a distribution from the specified values. */ |
| private CounterDistribution distribution(long... values) { |
| CounterDistribution dist = CounterDistribution.empty(); |
| for (long value : values) { |
| dist = dist.addValue(value); |
| } |
| |
| return dist; |
| } |
| } |