blob: 493d4cc324b1aa938ad5c83e2e1f1baec9cbcbd1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.testing;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.Iterators;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map.Entry;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.TestCredential;
import org.junit.experimental.categories.Category;
/**
* A creator of test pipelines that can be used inside of tests that can be
* configured to run locally or against a remote pipeline runner.
*
* <p>It is recommended to tag hand-selected tests for this purpose using the
* {@link RunnableOnService} {@link Category} annotation, as each test run against a pipeline runner
* will utilize resources of that pipeline runner.
*
* <p>In order to run tests on a pipeline runner, the following conditions must be met:
* <ul>
* <li>System property "beamTestPipelineOptions" must contain a JSON delimited list of pipeline
* options. For example:
* <pre>{@code [
* "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner",
* "--project=mygcpproject",
* "--stagingLocation=gs://mygcsbucket/path"
* ]}</pre>
* Note that the set of pipeline options required is pipeline runner specific.
* </li>
* <li>Jars containing the SDK and test classes must be available on the classpath.</li>
* </ul>
*
* <p>Use {@link PAssert} for tests, as it integrates with this test harness in both direct and
* remote execution modes. For example:
* <pre>{@code
* Pipeline p = TestPipeline.create();
* PCollection<Integer> output = ...
*
* PAssert.that(output)
* .containsInAnyOrder(1, 2, 3, 4);
* p.run();
* }</pre>
*
* <p>For pipeline runners, it is required that they must throw an {@link AssertionError}
* containing the message from the {@link PAssert} that failed.
*/
public class TestPipeline extends Pipeline {
static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner";
private static final ObjectMapper MAPPER = new ObjectMapper();
/**
* Creates and returns a new test pipeline.
*
* <p>Use {@link PAssert} to add tests, then call
* {@link Pipeline#run} to execute the pipeline and check the tests.
*/
public static TestPipeline create() {
return fromOptions(testingPipelineOptions());
}
public static TestPipeline fromOptions(PipelineOptions options) {
return new TestPipeline(PipelineRunner.fromOptions(options), options);
}
private TestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions options) {
super(runner, options);
}
/**
* Runs this {@link TestPipeline}, unwrapping any {@code AssertionError}
* that is raised during testing.
*/
@Override
public PipelineResult run() {
try {
return super.run();
} catch (RuntimeException exc) {
Throwable cause = exc.getCause();
if (cause instanceof AssertionError) {
throw (AssertionError) cause;
} else {
throw exc;
}
}
}
@Override
public String toString() {
return "TestPipeline#" + getOptions().as(ApplicationNameOptions.class).getAppName();
}
/**
* Creates {@link PipelineOptions} for testing.
*/
public static PipelineOptions testingPipelineOptions() {
try {
@Nullable String beamTestPipelineOptions =
System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS);
PipelineOptions options =
Strings.isNullOrEmpty(beamTestPipelineOptions)
? PipelineOptionsFactory.create()
: PipelineOptionsFactory.fromArgs(
MAPPER.readValue(beamTestPipelineOptions, String[].class))
.as(TestPipelineOptions.class);
options.as(ApplicationNameOptions.class).setAppName(getAppName());
// If no options were specified, set some reasonable defaults
if (Strings.isNullOrEmpty(beamTestPipelineOptions)) {
// If there are no provided options, check to see if a dummy runner should be used.
String useDefaultDummy = System.getProperty(PROPERTY_USE_DEFAULT_DUMMY_RUNNER);
if (!Strings.isNullOrEmpty(useDefaultDummy) && Boolean.valueOf(useDefaultDummy)) {
options.setRunner(CrashingRunner.class);
}
options.as(GcpOptions.class).setGcpCredential(new TestCredential());
}
options.setStableUniqueNames(CheckEnabled.ERROR);
IOChannelUtils.registerIOFactoriesAllowOverride(options);
return options;
} catch (IOException e) {
throw new RuntimeException("Unable to instantiate test options from system property "
+ PROPERTY_BEAM_TEST_PIPELINE_OPTIONS + ":"
+ System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS), e);
}
}
public static String[] convertToArgs(PipelineOptions options) {
try {
byte[] opts = MAPPER.writeValueAsBytes(options);
JsonParser jsonParser = MAPPER.getFactory().createParser(opts);
TreeNode node = jsonParser.readValueAsTree();
ObjectNode optsNode = (ObjectNode) node.get("options");
ArrayList<String> optArrayList = new ArrayList<>();
Iterator<Entry<String, JsonNode>> entries = optsNode.fields();
while (entries.hasNext()) {
Entry<String, JsonNode> entry = entries.next();
if (entry.getValue().isTextual()) {
optArrayList.add("--" + entry.getKey() + "=" + entry.getValue().asText());
} else {
optArrayList.add("--" + entry.getKey() + "=" + entry.getValue());
}
}
return optArrayList.toArray(new String[optArrayList.size()]);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
/** Returns the class + method name of the test, or a default name. */
private static String getAppName() {
Optional<StackTraceElement> stackTraceElement = findCallersStackTrace();
if (stackTraceElement.isPresent()) {
String methodName = stackTraceElement.get().getMethodName();
String className = stackTraceElement.get().getClassName();
if (className.contains(".")) {
className = className.substring(className.lastIndexOf(".") + 1);
}
return className + "-" + methodName;
}
return "UnitTest";
}
/** Returns the {@link StackTraceElement} of the calling class. */
private static Optional<StackTraceElement> findCallersStackTrace() {
Iterator<StackTraceElement> elements =
Iterators.forArray(Thread.currentThread().getStackTrace());
// First find the TestPipeline class in the stack trace.
while (elements.hasNext()) {
StackTraceElement next = elements.next();
if (TestPipeline.class.getName().equals(next.getClassName())) {
break;
}
}
// Then find the first instance after that is not the TestPipeline
Optional<StackTraceElement> firstInstanceAfterTestPipeline = Optional.absent();
while (elements.hasNext()) {
StackTraceElement next = elements.next();
if (!TestPipeline.class.getName().equals(next.getClassName())) {
if (!firstInstanceAfterTestPipeline.isPresent()) {
firstInstanceAfterTestPipeline = Optional.of(next);
}
try {
Class<?> nextClass = Class.forName(next.getClassName());
for (Method method : nextClass.getMethods()) {
if (method.getName().equals(next.getMethodName())) {
if (method.isAnnotationPresent(org.junit.Test.class)) {
return Optional.of(next);
} else if (method.isAnnotationPresent(org.junit.Before.class)) {
break;
}
}
}
} catch (Throwable t) {
break;
}
}
}
return firstInstanceAfterTestPipeline;
}
}