blob: db1ce6f5842915b55e83641d710a0a377542c721 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDistribution;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link DataflowElementExecutionTracker}. */
@RunWith(JUnit4.class)
public class DataflowElementExecutionTrackerTest {
@Rule public final ExpectedException thrown = ExpectedException.none();
private CounterSet counters;
private DataflowPipelineDebugOptions options;
private ElementExecutionTracker tracker;
@Before
public void setUp() {
counters = new CounterSet();
options = PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class);
options.setExperiments(
Lists.newArrayList(DataflowElementExecutionTracker.TIME_PER_ELEMENT_EXPERIMENT));
tracker = DataflowElementExecutionTracker.create(counters, options);
}
/** Typical usage scenario. */
@Test
public void testTypicalUsage() throws IOException {
NameContext stepA = createStep("A");
NameContext stepB = createStep("B");
NameContext stepC = createStep("C");
NameContext stepD = createStep("D");
// Comments track journal of executions for next sample, and partial timings not yet reported
tracker.enter(stepA); // IDLE A1 | {}
tracker.enter(stepB); // IDLE A1 B1 | {}
tracker.exit(); // IDLE A1 B1 A1 | {}
tracker.takeSample(40); // A1 | {A1:2}
assertThat(getCounterValue(stepB), equalTo(distribution(10)));
tracker.enter(stepB); // A1 B2 | {A1:2}
tracker.exit(); // A1 B2 A1 | {A1:2}
tracker.enter(stepC); // A1 B2 A1 C1 | {A1:2}
tracker.enter(stepD); // A1 B2 A1 C1 D1 | {A1:2}
tracker.takeSample(50); // D1 | {A1:4 C1:1 D1:1}
assertThat(getCounterValue(stepB), equalTo(distribution(10, 10)));
tracker.exit(); // D1 C1 | {A1:4 C1:1 D1:1}
tracker.exit(); // D1 C1 A1 | {A1:4 C1:1 D1:1}
tracker.enter(stepC); // D1 C1 A1 C2 | {A1:4 C1:1 D1:1}
tracker.takeSample(40); // C2 | {A1:5 C2:1}
assertThat(getCounterValue(stepC), equalTo(distribution(20)));
assertThat(getCounterValue(stepD), equalTo(distribution(20)));
tracker.exit(); // C2 A1 | {A1:5 C2:1}
tracker.exit(); // C2 A1 IDLE | {A1:5 C2:1}
tracker.takeSample(30); // done
assertThat(getCounterValue(stepA), equalTo(distribution(60)));
assertThat(getCounterValue(stepB), equalTo(distribution(10, 10)));
assertThat(getCounterValue(stepC), equalTo(distribution(20, 20)));
assertThat(getCounterValue(stepD), equalTo(distribution(20)));
}
/** Test that counter values are reported when a processing operation finishes. */
@Test
public void testCounterReportedOnClose() throws IOException {
NameContext step = createStep("A");
tracker.enter(step);
tracker.takeSample(10); // half of time attributed to initial IDLE execution
assertThat(getCounter(step), nullValue());
tracker.exit();
tracker.takeSample(10); // half of time attributed to final IDLE execution
assertThat(getCounterValue(step), equalTo(distribution(10)));
}
/** Ensure functionality is correctly disabled when the experiment is not set. */
@Test
public void testDisabledByExperiment() throws IOException {
List<String> experiments = options.getExperiments();
experiments.remove(DataflowElementExecutionTracker.TIME_PER_ELEMENT_EXPERIMENT);
options.setExperiments(experiments);
tracker = DataflowElementExecutionTracker.create(counters, options);
NameContext step = createStep("A");
tracker.enter(step);
tracker.exit();
tracker.takeSample(10);
assertThat(getCounter(step), nullValue());
}
/**
* Test that the sampling time is distributed evenly between all execution fragments since last
* sampling.
*/
@Test
public void testSampledTimeDistributedBetweenExecutionFragments() throws IOException {
NameContext stepA = createStep("A");
NameContext stepB = createStep("B");
tracker.enter(stepA);
tracker.exit();
tracker.enter(stepB);
tracker.exit();
// Expected journal: IDLE A1 IDLE B1 IDLE
tracker.takeSample(50);
assertThat(getCounterValue(stepA), equalTo(distribution(10)));
assertThat(getCounterValue(stepB), equalTo(distribution(10)));
}
/** Verify that each entry into a step tracks a separate element execution. */
@Test
public void testElementsTrackedIndividuallyForAStep() throws IOException {
NameContext stepA = createStep("A");
NameContext stepB = createStep("B");
tracker.enter(stepA);
tracker.enter(stepB);
tracker.exit();
tracker.enter(stepB);
tracker.exit();
tracker.exit();
// Expected journal: IDLE A1 B1 A1 B2 A1 IDLE
tracker.takeSample(70);
assertThat(getCounterValue(stepA), equalTo(distribution(30)));
assertThat(getCounterValue(stepB), equalTo(distribution(10, 10)));
}
/**
* Test that the currently processing element at time of sampling is also counted in the next
* sampling period.
*/
@Test
public void testCurrentOperationCountedInNextSample() throws IOException {
NameContext step = createStep("A");
tracker.enter(step);
tracker.takeSample(20); // Journal: IDLE A1
tracker.takeSample(10); // Journal: A1
tracker.exit();
tracker.takeSample(20); // Journal: A1 IDLE
assertThat(getCounterValue(step), equalTo(distribution(30)));
}
/**
* Ensure that sampling is properly handled when there are no new executions since the last
* sampling period.
*/
@Test
public void testNoExecutionsSinceLastSample() throws IOException {
tracker.takeSample(10); // Journal: IDLE
NameContext step = createStep("A");
tracker.enter(step);
tracker.takeSample(10); // Journal: IDLE A1
tracker.takeSample(10); // Journal: A1
tracker.exit();
tracker.takeSample(10); // Journal: A1 IDLE
assertThat(getCounterValue(step), equalTo(distribution(20)));
tracker.takeSample(10); // Journal: IDLE
assertThat(getCounterValue(step), equalTo(distribution(20)));
}
@Test
public void testThrowsOnExitBeforeEnter() {
thrown.expect(IllegalStateException.class);
tracker.exit();
}
private NameContext createStep(String stepName) {
return NameContext.create("anyStage", stepName, "anySystem", "AnyUserName");
}
/** Retrieve the per-element-processing-time counter aggregate for the given step. */
private CounterDistribution getCounterValue(NameContext step) {
Counter<Long, CounterDistribution> counter = getCounter(step);
assertThat(
"per-element-processing-time counter should not be null for step: " + step,
counter,
notNullValue());
return counter.getAggregate();
}
/**
* Retrieve the per-element-processing-time counter for the given step, or null if the counter has
* not been written.
*/
@Nullable
private Counter<Long, CounterDistribution> getCounter(NameContext step) {
CounterName counterName =
CounterName.named("per-element-processing-time").withOriginalName(step);
return (Counter<Long, CounterDistribution>) counters.getExistingCounter(counterName);
}
/** Build a distribution from the specified values. */
private CounterDistribution distribution(long... values) {
CounterDistribution dist = CounterDistribution.empty();
for (long value : values) {
dist = dist.addValue(value);
}
return dist;
}
}