blob: 9214ecf321b283b1322b0e9f3ca525a04fa2187b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow;
import static org.apache.beam.runners.dataflow.DataflowRunner.getContainerImageForJob;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.ListJobsResponse;
import com.google.api.services.storage.model.StorageObject;
import com.google.auto.service.AutoService;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowRunner.StreamingShardedWriteFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.DynamicFileDestinations;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.hamcrest.Description;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
/**
* Tests for the {@link DataflowRunner}.
*
* <p>Implements {@link Serializable} because it is caught in closures.
*/
@RunWith(JUnit4.class)
public class DataflowRunnerTest implements Serializable {
private static final String VALID_BUCKET = "valid-bucket";
private static final String VALID_STAGING_BUCKET = "gs://valid-bucket/staging";
private static final String VALID_TEMP_BUCKET = "gs://valid-bucket/temp";
private static final String VALID_PROFILE_BUCKET = "gs://valid-bucket/profiles";
private static final String NON_EXISTENT_BUCKET = "gs://non-existent-bucket/location";
private static final String PROJECT_ID = "some-project";
private static final String REGION_ID = "some-region-1";
@Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule public transient ExpectedException thrown = ExpectedException.none();
@Rule public transient ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class);
private transient Dataflow.Projects.Locations.Jobs mockJobs;
private transient GcsUtil mockGcsUtil;
// Asserts that the given Job has all expected fields set.
private static void assertValidJob(Job job) {
assertNull(job.getId());
assertNull(job.getCurrentState());
assertTrue(Pattern.matches("[a-z]([-a-z0-9]*[a-z0-9])?", job.getName()));
assertThat(
(Map<String, Object>) job.getEnvironment().getSdkPipelineOptions().get("options"),
hasKey("pipelineUrl"));
}
@Before
public void setUp() throws IOException {
this.mockGcsUtil = mock(GcsUtil.class);
when(mockGcsUtil.create(any(GcsPath.class), anyString()))
.then(
invocation ->
FileChannel.open(
Files.createTempFile("channel-", ".tmp"),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.DELETE_ON_CLOSE));
when(mockGcsUtil.create(any(GcsPath.class), anyString(), anyInt()))
.then(
invocation ->
FileChannel.open(
Files.createTempFile("channel-", ".tmp"),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.DELETE_ON_CLOSE));
when(mockGcsUtil.expand(any(GcsPath.class)))
.then(invocation -> ImmutableList.of((GcsPath) invocation.getArguments()[0]));
when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_STAGING_BUCKET))).thenReturn(true);
when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET))).thenReturn(true);
when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET + "/staging/")))
.thenReturn(true);
when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_PROFILE_BUCKET))).thenReturn(true);
when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(NON_EXISTENT_BUCKET))).thenReturn(false);
// Let every valid path be matched
when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
.thenAnswer(
invocationOnMock -> {
List<GcsPath> gcsPaths = (List<GcsPath>) invocationOnMock.getArguments()[0];
List<GcsUtil.StorageObjectOrIOException> results = new ArrayList<>();
for (GcsPath gcsPath : gcsPaths) {
if (gcsPath.getBucket().equals(VALID_BUCKET)) {
StorageObject resultObject = new StorageObject();
resultObject.setBucket(gcsPath.getBucket());
resultObject.setName(gcsPath.getObject());
results.add(GcsUtil.StorageObjectOrIOException.create(resultObject));
}
}
return results;
});
// The dataflow pipeline attempts to output to this location.
when(mockGcsUtil.bucketAccessible(GcsPath.fromUri("gs://bucket/object"))).thenReturn(true);
mockJobs = mock(Dataflow.Projects.Locations.Jobs.class);
}
private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
options.setStableUniqueNames(CheckEnabled.ERROR);
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
.apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
// Enable the FileSystems API to know about gs:// URIs in this test.
FileSystems.setDefaultPipelineOptions(options);
return p;
}
private Dataflow buildMockDataflow() throws IOException {
Dataflow mockDataflowClient = mock(Dataflow.class);
Dataflow.Projects mockProjects = mock(Dataflow.Projects.class);
Dataflow.Projects.Locations mockLocations = mock(Dataflow.Projects.Locations.class);
Dataflow.Projects.Locations.Jobs.Create mockRequest =
mock(Dataflow.Projects.Locations.Jobs.Create.class);
Dataflow.Projects.Locations.Jobs.List mockList =
mock(Dataflow.Projects.Locations.Jobs.List.class);
when(mockDataflowClient.projects()).thenReturn(mockProjects);
when(mockProjects.locations()).thenReturn(mockLocations);
when(mockLocations.jobs()).thenReturn(mockJobs);
when(mockJobs.create(eq(PROJECT_ID), eq(REGION_ID), isA(Job.class))).thenReturn(mockRequest);
when(mockJobs.list(eq(PROJECT_ID), eq(REGION_ID))).thenReturn(mockList);
when(mockList.setPageToken(anyString())).thenReturn(mockList);
when(mockList.execute())
.thenReturn(
new ListJobsResponse()
.setJobs(
Arrays.asList(
new Job()
.setName("oldjobname")
.setId("oldJobId")
.setCurrentState("JOB_STATE_RUNNING"))));
Job resultJob = new Job();
resultJob.setId("newid");
when(mockRequest.execute()).thenReturn(resultJob);
return mockDataflowClient;
}
private GcsUtil buildMockGcsUtil() throws IOException {
GcsUtil mockGcsUtil = mock(GcsUtil.class);
when(mockGcsUtil.create(any(GcsPath.class), anyString()))
.then(
invocation ->
FileChannel.open(
Files.createTempFile("channel-", ".tmp"),
StandardOpenOption.CREATE,
StandardOpenOption.DELETE_ON_CLOSE));
when(mockGcsUtil.expand(any(GcsPath.class)))
.then(invocation -> ImmutableList.of((GcsPath) invocation.getArguments()[0]));
return mockGcsUtil;
}
private DataflowPipelineOptions buildPipelineOptions() throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject(PROJECT_ID);
options.setTempLocation(VALID_TEMP_BUCKET);
options.setRegion(REGION_ID);
// Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
options.setFilesToStage(new ArrayList<>());
options.setDataflowClient(buildMockDataflow());
options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());
// Configure the FileSystem registrar to use these options.
FileSystems.setDefaultPipelineOptions(options);
return options;
}
@Test
public void testPathValidation() {
String[] args =
new String[] {
"--runner=DataflowRunner",
"--tempLocation=/tmp/not/a/gs/path",
"--project=test-project",
"--credentialFactoryClass=" + NoopCredentialFactory.class.getName(),
};
try {
Pipeline.create(PipelineOptionsFactory.fromArgs(args).create()).run();
fail();
} catch (RuntimeException e) {
assertThat(
Throwables.getStackTraceAsString(e),
containsString("DataflowRunner requires gcpTempLocation"));
}
}
@Test
public void testPathExistsValidation() {
String[] args =
new String[] {
"--runner=DataflowRunner",
"--tempLocation=gs://does/not/exist",
"--project=test-project",
"--credentialFactoryClass=" + NoopCredentialFactory.class.getName(),
};
try {
Pipeline.create(PipelineOptionsFactory.fromArgs(args).create()).run();
fail();
} catch (RuntimeException e) {
assertThat(
Throwables.getStackTraceAsString(e),
both(containsString("gs://does/not/exist"))
.and(containsString("Unable to verify that GCS bucket gs://does exists")));
}
}
@Test
public void testPathValidatorOverride() {
String[] args =
new String[] {
"--runner=DataflowRunner",
"--tempLocation=/tmp/testing",
"--project=test-project",
"--credentialFactoryClass=" + NoopCredentialFactory.class.getName(),
"--pathValidatorClass=" + NoopPathValidator.class.getName(),
};
// Should not crash, because gcpTempLocation should get set from tempLocation
TestPipeline.fromOptions(PipelineOptionsFactory.fromArgs(args).create());
}
@Test
public void testFromOptionsWithUppercaseConvertsToLowercase() throws Exception {
String mixedCase = "ThisJobNameHasMixedCase";
DataflowPipelineOptions options = buildPipelineOptions();
options.setJobName(mixedCase);
DataflowRunner.fromOptions(options);
assertThat(options.getJobName(), equalTo(mixedCase.toLowerCase()));
}
@Test
public void testFromOptionsUserAgentFromPipelineInfo() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
DataflowRunner.fromOptions(options);
String expectedName = DataflowRunnerInfo.getDataflowRunnerInfo().getName().replace(" ", "_");
assertThat(options.getUserAgent(), containsString(expectedName));
String expectedVersion = DataflowRunnerInfo.getDataflowRunnerInfo().getVersion();
assertThat(options.getUserAgent(), containsString(expectedVersion));
}
/**
* Invasive mock-based test for checking that the JSON generated for the pipeline options has not
* had vital fields pruned.
*/
@Test
public void testSettingOfSdkPipelineOptions() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
// These options are important only for this test, and need not be global to the test class
options.setAppName(DataflowRunnerTest.class.getSimpleName());
options.setJobName("some-job-name");
Pipeline p = Pipeline.create(options);
p.run();
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
Map<String, Object> sdkPipelineOptions =
jobCaptor.getValue().getEnvironment().getSdkPipelineOptions();
assertThat(sdkPipelineOptions, hasKey("options"));
Map<String, Object> optionsMap = (Map<String, Object>) sdkPipelineOptions.get("options");
assertThat(optionsMap, hasEntry("appName", (Object) options.getAppName()));
assertThat(optionsMap, hasEntry("project", (Object) options.getProject()));
assertThat(
optionsMap,
hasEntry("pathValidatorClass", (Object) options.getPathValidatorClass().getName()));
assertThat(optionsMap, hasEntry("runner", (Object) options.getRunner().getName()));
assertThat(optionsMap, hasEntry("jobName", (Object) options.getJobName()));
assertThat(optionsMap, hasEntry("tempLocation", (Object) options.getTempLocation()));
assertThat(optionsMap, hasEntry("stagingLocation", (Object) options.getStagingLocation()));
assertThat(
optionsMap,
hasEntry("stableUniqueNames", (Object) options.getStableUniqueNames().toString()));
assertThat(optionsMap, hasEntry("streaming", (Object) options.isStreaming()));
assertThat(
optionsMap,
hasEntry(
"numberOfWorkerHarnessThreads", (Object) options.getNumberOfWorkerHarnessThreads()));
}
@Test
public void testSettingFlexRS() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setFlexRSGoal(DataflowPipelineOptions.FlexResourceSchedulingGoal.COST_OPTIMIZED);
Pipeline p = Pipeline.create(options);
p.run();
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
assertEquals(
"FLEXRS_COST_OPTIMIZED",
jobCaptor.getValue().getEnvironment().getFlexResourceSchedulingGoal());
}
/** PipelineOptions used to test auto registration of Jackson modules. */
public interface JacksonIncompatibleOptions extends PipelineOptions {
JacksonIncompatible getJacksonIncompatible();
void setJacksonIncompatible(JacksonIncompatible value);
}
/** A Jackson {@link Module} to test auto-registration of modules. */
@AutoService(Module.class)
public static class RegisteredTestModule extends SimpleModule {
public RegisteredTestModule() {
super("RegisteredTestModule");
setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class);
}
}
/** A class which Jackson does not know how to serialize/deserialize. */
public static class JacksonIncompatible {
private final String value;
public JacksonIncompatible(String value) {
this.value = value;
}
}
/** A Jackson mixin used to add annotations to other classes. */
@JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
@JsonSerialize(using = JacksonIncompatibleSerializer.class)
public static final class JacksonIncompatibleMixin {}
/** A Jackson deserializer for {@link JacksonIncompatible}. */
public static class JacksonIncompatibleDeserializer
extends JsonDeserializer<JacksonIncompatible> {
@Override
public JacksonIncompatible deserialize(
JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException, JsonProcessingException {
return new JacksonIncompatible(jsonParser.readValueAs(String.class));
}
}
/** A Jackson serializer for {@link JacksonIncompatible}. */
public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> {
@Override
public void serialize(
JacksonIncompatible jacksonIncompatible,
JsonGenerator jsonGenerator,
SerializerProvider serializerProvider)
throws IOException, JsonProcessingException {
jsonGenerator.writeString(jacksonIncompatible.value);
}
}
@Test
public void testSettingOfPipelineOptionsWithCustomUserType() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options
.as(JacksonIncompatibleOptions.class)
.setJacksonIncompatible(new JacksonIncompatible("userCustomTypeTest"));
Pipeline p = Pipeline.create(options);
p.run();
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
Map<String, Object> sdkPipelineOptions =
jobCaptor.getValue().getEnvironment().getSdkPipelineOptions();
assertThat(sdkPipelineOptions, hasKey("options"));
Map<String, Object> optionsMap = (Map<String, Object>) sdkPipelineOptions.get("options");
assertThat(optionsMap, hasEntry("jacksonIncompatible", (Object) "userCustomTypeTest"));
}
@Test
public void testRun() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
Pipeline p = buildDataflowPipeline(options);
DataflowPipelineJob job = (DataflowPipelineJob) p.run();
assertEquals("newid", job.getJobId());
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
}
/** Options for testing. */
public interface RuntimeTestOptions extends PipelineOptions {
ValueProvider<String> getInput();
void setInput(ValueProvider<String> value);
ValueProvider<String> getOutput();
void setOutput(ValueProvider<String> value);
}
@Test
public void testTextIOWithRuntimeParameters() throws IOException {
DataflowPipelineOptions dataflowOptions = buildPipelineOptions();
RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class);
Pipeline p = buildDataflowPipeline(dataflowOptions);
p.apply(TextIO.read().from(options.getInput())).apply(TextIO.write().to(options.getOutput()));
}
/** Tests that all reads are consumed by at least one {@link PTransform}. */
@Test
public void testUnconsumedReads() throws IOException {
DataflowPipelineOptions dataflowOptions = buildPipelineOptions();
RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class);
Pipeline p = buildDataflowPipeline(dataflowOptions);
p.apply(TextIO.read().from(options.getInput()));
DataflowRunner.fromOptions(dataflowOptions).replaceTransforms(p);
final AtomicBoolean unconsumedSeenAsInput = new AtomicBoolean();
p.traverseTopologically(
new PipelineVisitor.Defaults() {
@Override
public void visitPrimitiveTransform(Node node) {
unconsumedSeenAsInput.set(true);
}
});
assertThat(unconsumedSeenAsInput.get(), is(true));
}
@Test
public void testRunReturnDifferentRequestId() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
Dataflow mockDataflowClient = options.getDataflowClient();
Dataflow.Projects.Locations.Jobs.Create mockRequest =
mock(Dataflow.Projects.Locations.Jobs.Create.class);
when(mockDataflowClient
.projects()
.locations()
.jobs()
.create(eq(PROJECT_ID), eq(REGION_ID), any(Job.class)))
.thenReturn(mockRequest);
Job resultJob = new Job();
resultJob.setId("newid");
// Return a different request id.
resultJob.setClientRequestId("different_request_id");
when(mockRequest.execute()).thenReturn(resultJob);
Pipeline p = buildDataflowPipeline(options);
try {
p.run();
fail("Expected DataflowJobAlreadyExistsException");
} catch (DataflowJobAlreadyExistsException expected) {
assertThat(
expected.getMessage(),
containsString(
"If you want to submit a second job, try again by setting a "
+ "different name using --jobName."));
assertEquals(expected.getJob().getJobId(), resultJob.getId());
}
}
@Test
public void testUpdate() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setUpdate(true);
options.setJobName("oldJobName");
Pipeline p = buildDataflowPipeline(options);
DataflowPipelineJob job = (DataflowPipelineJob) p.run();
assertEquals("newid", job.getJobId());
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
}
@Test
public void testUploadGraph() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setExperiments(Arrays.asList("upload_graph"));
Pipeline p = buildDataflowPipeline(options);
DataflowPipelineJob job = (DataflowPipelineJob) p.run();
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
assertTrue(jobCaptor.getValue().getSteps().isEmpty());
assertTrue(
jobCaptor
.getValue()
.getStepsLocation()
.startsWith("gs://valid-bucket/temp/staging/dataflow_graph"));
}
@Test
public void testUpdateNonExistentPipeline() throws IOException {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Could not find running job named badjobname");
DataflowPipelineOptions options = buildPipelineOptions();
options.setUpdate(true);
options.setJobName("badJobName");
Pipeline p = buildDataflowPipeline(options);
p.run();
}
@Test
public void testUpdateAlreadyUpdatedPipeline() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setUpdate(true);
options.setJobName("oldJobName");
Dataflow mockDataflowClient = options.getDataflowClient();
Dataflow.Projects.Locations.Jobs.Create mockRequest =
mock(Dataflow.Projects.Locations.Jobs.Create.class);
when(mockDataflowClient
.projects()
.locations()
.jobs()
.create(eq(PROJECT_ID), eq(REGION_ID), any(Job.class)))
.thenReturn(mockRequest);
final Job resultJob = new Job();
resultJob.setId("newid");
// Return a different request id.
resultJob.setClientRequestId("different_request_id");
when(mockRequest.execute()).thenReturn(resultJob);
Pipeline p = buildDataflowPipeline(options);
thrown.expect(DataflowJobAlreadyUpdatedException.class);
thrown.expect(
new TypeSafeMatcher<DataflowJobAlreadyUpdatedException>() {
@Override
public void describeTo(Description description) {
description.appendText("Expected job ID: " + resultJob.getId());
}
@Override
protected boolean matchesSafely(DataflowJobAlreadyUpdatedException item) {
return resultJob.getId().equals(item.getJob().getJobId());
}
});
thrown.expectMessage(
"The job named oldjobname with id: oldJobId has already been updated "
+ "into job id: newid and cannot be updated again.");
p.run();
}
@Test
public void testRunWithFiles() throws IOException {
// Test that the function DataflowRunner.stageFiles works as expected.
final String cloudDataflowDataset = "somedataset";
// Create some temporary files.
File temp1 = File.createTempFile("DataflowRunnerTest", "txt");
temp1.deleteOnExit();
File temp2 = File.createTempFile("DataflowRunnerTest2", "txt");
temp2.deleteOnExit();
String overridePackageName = "alias.txt";
when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
.thenReturn(
ImmutableList.of(
GcsUtil.StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setFilesToStage(
ImmutableList.of(
temp1.getAbsolutePath(), overridePackageName + "=" + temp2.getAbsolutePath()));
options.setStagingLocation(VALID_STAGING_BUCKET);
options.setTempLocation(VALID_TEMP_BUCKET);
options.setTempDatasetId(cloudDataflowDataset);
options.setProject(PROJECT_ID);
options.setRegion(REGION_ID);
options.setJobName("job");
options.setDataflowClient(buildMockDataflow());
options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());
when(mockGcsUtil.create(any(GcsPath.class), anyString(), anyInt()))
.then(
invocation ->
FileChannel.open(
Files.createTempFile("channel-", ".tmp"),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.DELETE_ON_CLOSE));
Pipeline p = buildDataflowPipeline(options);
DataflowPipelineJob job = (DataflowPipelineJob) p.run();
assertEquals("newid", job.getJobId());
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
Job workflowJob = jobCaptor.getValue();
assertValidJob(workflowJob);
assertEquals(2, workflowJob.getEnvironment().getWorkerPools().get(0).getPackages().size());
DataflowPackage workflowPackage1 =
workflowJob.getEnvironment().getWorkerPools().get(0).getPackages().get(0);
assertThat(workflowPackage1.getName(), startsWith(temp1.getName()));
DataflowPackage workflowPackage2 =
workflowJob.getEnvironment().getWorkerPools().get(0).getPackages().get(1);
assertEquals(overridePackageName, workflowPackage2.getName());
assertEquals(
GcsPath.fromUri(VALID_TEMP_BUCKET).toResourceName(),
workflowJob.getEnvironment().getTempStoragePrefix());
assertEquals(cloudDataflowDataset, workflowJob.getEnvironment().getDataset());
assertEquals(
DataflowRunnerInfo.getDataflowRunnerInfo().getName(),
workflowJob.getEnvironment().getUserAgent().get("name"));
assertEquals(
DataflowRunnerInfo.getDataflowRunnerInfo().getVersion(),
workflowJob.getEnvironment().getUserAgent().get("version"));
}
@Test
public void runWithDefaultFilesToStage() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
options.setFilesToStage(null);
DataflowRunner.fromOptions(options);
assertTrue(!options.getFilesToStage().isEmpty());
}
@Test
public void testGcsStagingLocationInitialization() throws Exception {
// Set temp location (required), and check that staging location is set.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setTempLocation(VALID_TEMP_BUCKET);
options.setProject(PROJECT_ID);
options.setGcpCredential(new TestCredential());
options.setGcsUtil(mockGcsUtil);
options.setRunner(DataflowRunner.class);
DataflowRunner.fromOptions(options);
assertNotNull(options.getStagingLocation());
}
@Test
public void testInvalidGcpTempLocation() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setGcpTempLocation("file://temp/location");
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(containsString("Expected a valid 'gs://' path but was given"));
DataflowRunner.fromOptions(options);
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
}
@Test
public void testNonGcsTempLocation() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setTempLocation("file://temp/location");
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"DataflowRunner requires gcpTempLocation, "
+ "but failed to retrieve a value from PipelineOptions");
DataflowRunner.fromOptions(options);
}
@Test
public void testInvalidStagingLocation() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setStagingLocation("file://my/staging/location");
try {
DataflowRunner.fromOptions(options);
fail("fromOptions should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("Expected a valid 'gs://' path but was given"));
}
options.setStagingLocation("my/staging/location");
try {
DataflowRunner.fromOptions(options);
fail("fromOptions should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("Expected a valid 'gs://' path but was given"));
}
}
@Test
public void testInvalidProfileLocation() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setSaveProfilesToGcs("file://my/staging/location");
try {
DataflowRunner.fromOptions(options);
fail("fromOptions should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("Expected a valid 'gs://' path but was given"));
}
options.setSaveProfilesToGcs("my/staging/location");
try {
DataflowRunner.fromOptions(options);
fail("fromOptions should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("Expected a valid 'gs://' path but was given"));
}
}
@Test
public void testNonExistentTempLocation() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setGcpTempLocation(NON_EXISTENT_BUCKET);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
containsString("Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET));
DataflowRunner.fromOptions(options);
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
}
@Test
public void testNonExistentStagingLocation() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setStagingLocation(NON_EXISTENT_BUCKET);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
containsString("Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET));
DataflowRunner.fromOptions(options);
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
}
@Test
public void testNonExistentProfileLocation() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setSaveProfilesToGcs(NON_EXISTENT_BUCKET);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
containsString("Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET));
DataflowRunner.fromOptions(options);
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
}
@Test
public void testNoProjectFails() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
// Explicitly set to null to prevent the default instance factory from reading credentials
// from a user's environment, causing this test to fail.
options.setProject(null);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Project id");
thrown.expectMessage("when running a Dataflow in the cloud");
DataflowRunner.fromOptions(options);
}
@Test
public void testProjectId() throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("foo-12345");
options.setGcpTempLocation(VALID_TEMP_BUCKET);
options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());
DataflowRunner.fromOptions(options);
}
@Test
public void testProjectPrefix() throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("google.com:some-project-12345");
options.setGcpTempLocation(VALID_TEMP_BUCKET);
options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());
DataflowRunner.fromOptions(options);
}
@Test
public void testProjectNumber() throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("12345");
options.setGcpTempLocation(VALID_TEMP_BUCKET);
options.setGcsUtil(mockGcsUtil);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Project ID");
thrown.expectMessage("project number");
DataflowRunner.fromOptions(options);
}
@Test
public void testProjectDescription() throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("some project");
options.setGcpTempLocation(VALID_TEMP_BUCKET);
options.setGcsUtil(mockGcsUtil);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Project ID");
thrown.expectMessage("project description");
DataflowRunner.fromOptions(options);
}
@Test
public void testInvalidNumberOfWorkerHarnessThreads() throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
FileSystems.setDefaultPipelineOptions(options);
options.setRunner(DataflowRunner.class);
options.setProject("foo-12345");
options.setGcpTempLocation(VALID_TEMP_BUCKET);
options.setGcsUtil(mockGcsUtil);
options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Number of worker harness threads");
thrown.expectMessage("Please make sure the value is non-negative.");
DataflowRunner.fromOptions(options);
}
@Test
public void testNoStagingLocationAndNoTempLocationFails() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("foo-project");
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"DataflowRunner requires gcpTempLocation, "
+ "but failed to retrieve a value from PipelineOption");
DataflowRunner.fromOptions(options);
}
@Test
public void testGcpTempAndNoTempLocationSucceeds() throws Exception {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setGcpCredential(new TestCredential());
options.setProject("foo-project");
options.setGcpTempLocation(VALID_TEMP_BUCKET);
options.setGcsUtil(mockGcsUtil);
DataflowRunner.fromOptions(options);
}
@Test
public void testTempLocationAndNoGcpTempLocationSucceeds() throws Exception {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setGcpCredential(new TestCredential());
options.setProject("foo-project");
options.setTempLocation(VALID_TEMP_BUCKET);
options.setGcsUtil(mockGcsUtil);
DataflowRunner.fromOptions(options);
}
@Test
public void testValidProfileLocation() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setSaveProfilesToGcs(VALID_PROFILE_BUCKET);
DataflowRunner.fromOptions(options);
}
@Test
public void testInvalidJobName() throws IOException {
List<String> invalidNames = Arrays.asList("invalid_name", "0invalid", "invalid-");
List<String> expectedReason =
Arrays.asList("JobName invalid", "JobName invalid", "JobName invalid");
for (int i = 0; i < invalidNames.size(); ++i) {
DataflowPipelineOptions options = buildPipelineOptions();
options.setJobName(invalidNames.get(i));
try {
DataflowRunner.fromOptions(options);
fail("Expected IllegalArgumentException for jobName " + options.getJobName());
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString(expectedReason.get(i)));
}
}
}
@Test
public void testValidJobName() throws IOException {
List<String> names =
Arrays.asList("ok", "Ok", "A-Ok", "ok-123", "this-one-is-fairly-long-01234567890123456789");
for (String name : names) {
DataflowPipelineOptions options = buildPipelineOptions();
options.setJobName(name);
DataflowRunner runner = DataflowRunner.fromOptions(options);
assertNotNull(runner);
}
}
@Test
public void testGcsUploadBufferSizeIsUnsetForBatchWhenDefault() throws IOException {
DataflowPipelineOptions batchOptions = buildPipelineOptions();
batchOptions.setRunner(DataflowRunner.class);
Pipeline.create(batchOptions);
assertNull(batchOptions.getGcsUploadBufferSizeBytes());
}
@Test
public void testGcsUploadBufferSizeIsSetForStreamingWhenDefault() throws IOException {
DataflowPipelineOptions streamingOptions = buildPipelineOptions();
streamingOptions.setStreaming(true);
streamingOptions.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(streamingOptions);
// Instantiation of a runner prior to run() currently has a side effect of mutating the options.
// This could be tested by DataflowRunner.fromOptions(streamingOptions) but would not ensure
// that the pipeline itself had the expected options set.
p.run();
assertEquals(
DataflowRunner.GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT,
streamingOptions.getGcsUploadBufferSizeBytes().intValue());
}
@Test
public void testGcsUploadBufferSizeUnchangedWhenNotDefault() throws IOException {
int gcsUploadBufferSizeBytes = 12345678;
DataflowPipelineOptions batchOptions = buildPipelineOptions();
batchOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes);
batchOptions.setRunner(DataflowRunner.class);
Pipeline.create(batchOptions);
assertEquals(gcsUploadBufferSizeBytes, batchOptions.getGcsUploadBufferSizeBytes().intValue());
DataflowPipelineOptions streamingOptions = buildPipelineOptions();
streamingOptions.setStreaming(true);
streamingOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes);
streamingOptions.setRunner(DataflowRunner.class);
Pipeline.create(streamingOptions);
assertEquals(
gcsUploadBufferSizeBytes, streamingOptions.getGcsUploadBufferSizeBytes().intValue());
}
/** A fake PTransform for testing. */
public static class TestTransform extends PTransform<PCollection<Integer>, PCollection<Integer>> {
public boolean translated = false;
@Override
public PCollection<Integer> expand(PCollection<Integer> input) {
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
input.isBounded(),
input.getCoder());
}
}
@Test
public void testTransformTranslatorMissing() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
Pipeline p = Pipeline.create(options);
p.apply(Create.of(Arrays.asList(1, 2, 3))).apply(new TestTransform());
thrown.expect(IllegalStateException.class);
thrown.expectMessage(containsString("no translator registered"));
DataflowPipelineTranslator.fromOptions(options)
.translate(p, DataflowRunner.fromOptions(options), Collections.emptyList());
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
}
@Test
public void testTransformTranslator() throws IOException {
// Test that we can provide a custom translation
DataflowPipelineOptions options = buildPipelineOptions();
Pipeline p = Pipeline.create(options);
TestTransform transform = new TestTransform();
p.apply(Create.of(Arrays.asList(1, 2, 3)).withCoder(BigEndianIntegerCoder.of()))
.apply(transform);
DataflowPipelineTranslator translator = DataflowRunner.fromOptions(options).getTranslator();
DataflowPipelineTranslator.registerTransformTranslator(
TestTransform.class,
(transform1, context) -> {
transform1.translated = true;
// Note: This is about the minimum needed to fake out a
// translation. This obviously isn't a real translation.
TransformTranslator.StepTranslationContext stepContext =
context.addStep(transform1, "TestTranslate");
stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform1));
});
translator.translate(p, DataflowRunner.fromOptions(options), Collections.emptyList());
assertTrue(transform.translated);
}
private void verifyMapStateUnsupported(PipelineOptions options) throws Exception {
Pipeline p = Pipeline.create(options);
p.apply(Create.of(KV.of(13, 42)))
.apply(
ParDo.of(
new DoFn<KV<Integer, Integer>, Void>() {
@StateId("fizzle")
private final StateSpec<MapState<Void, Void>> voidState = StateSpecs.map();
@ProcessElement
public void process() {}
}));
thrown.expectMessage("MapState");
thrown.expect(UnsupportedOperationException.class);
p.run();
}
@Test
public void testMapStateUnsupportedInBatch() throws Exception {
PipelineOptions options = buildPipelineOptions();
options.as(StreamingOptions.class).setStreaming(false);
verifyMapStateUnsupported(options);
}
@Test
public void testMapStateUnsupportedInStreaming() throws Exception {
PipelineOptions options = buildPipelineOptions();
options.as(StreamingOptions.class).setStreaming(true);
verifyMapStateUnsupported(options);
}
private void verifySetStateUnsupported(PipelineOptions options) throws Exception {
Pipeline p = Pipeline.create(options);
p.apply(Create.of(KV.of(13, 42)))
.apply(
ParDo.of(
new DoFn<KV<Integer, Integer>, Void>() {
@StateId("fizzle")
private final StateSpec<SetState<Void>> voidState = StateSpecs.set();
@ProcessElement
public void process() {}
}));
thrown.expectMessage("SetState");
thrown.expect(UnsupportedOperationException.class);
p.run();
}
@Test
public void testSetStateUnsupportedInBatch() throws Exception {
PipelineOptions options = buildPipelineOptions();
options.as(StreamingOptions.class).setStreaming(false);
Pipeline.create(options);
verifySetStateUnsupported(options);
}
@Test
public void testSetStateUnsupportedInStreaming() throws Exception {
PipelineOptions options = buildPipelineOptions();
options.as(StreamingOptions.class).setStreaming(true);
verifySetStateUnsupported(options);
}
/** Records all the composite transforms visited within the Pipeline. */
private static class CompositeTransformRecorder extends PipelineVisitor.Defaults {
private List<PTransform<?, ?>> transforms = new ArrayList<>();
@Override
public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
if (node.getTransform() != null) {
transforms.add(node.getTransform());
}
return CompositeBehavior.ENTER_TRANSFORM;
}
public List<PTransform<?, ?>> getCompositeTransforms() {
return transforms;
}
}
@Test
public void testApplyIsScopedToExactClass() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
Pipeline p = Pipeline.create(options);
Create.TimestampedValues<String> transform =
Create.timestamped(Arrays.asList(TimestampedValue.of("TestString", Instant.now())));
p.apply(transform);
CompositeTransformRecorder recorder = new CompositeTransformRecorder();
p.traverseTopologically(recorder);
// The recorder will also have seen a Create.Values composite as well, but we can't obtain that
// transform.
assertThat(
"Expected to have seen CreateTimestamped composite transform.",
recorder.getCompositeTransforms(),
hasItem(transform));
assertThat(
"Expected to have two composites, CreateTimestamped and Create.Values",
recorder.getCompositeTransforms(),
hasItem(Matchers.<PTransform<?, ?>>isA((Class) Create.Values.class)));
}
@Test
public void testToString() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setJobName("TestJobName");
options.setProject("test-project");
options.setTempLocation("gs://test/temp/location");
options.setGcpCredential(new TestCredential());
options.setPathValidatorClass(NoopPathValidator.class);
options.setRunner(DataflowRunner.class);
assertEquals("DataflowRunner#testjobname", DataflowRunner.fromOptions(options).toString());
}
/**
* Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally when the
* runner is successfully run.
*/
@Test
public void testTemplateRunnerFullCompletion() throws Exception {
File existingFile = tmpFolder.newFile();
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setJobName("TestJobName");
options.setGcpCredential(new TestCredential());
options.setPathValidatorClass(NoopPathValidator.class);
options.setProject("test-project");
options.setRunner(DataflowRunner.class);
options.setTemplateLocation(existingFile.getPath());
options.setTempLocation(tmpFolder.getRoot().getPath());
Pipeline p = Pipeline.create(options);
p.run();
expectedLogs.verifyInfo("Template successfully created");
}
/**
* Tests that the {@link DataflowRunner} with {@code --templateLocation} throws the appropriate
* exception when an output file is not writable.
*/
@Test
public void testTemplateRunnerLoggedErrorForFile() throws Exception {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setJobName("TestJobName");
options.setRunner(DataflowRunner.class);
options.setTemplateLocation("//bad/path");
options.setProject("test-project");
options.setTempLocation(tmpFolder.getRoot().getPath());
options.setGcpCredential(new TestCredential());
options.setPathValidatorClass(NoopPathValidator.class);
Pipeline p = Pipeline.create(options);
thrown.expectMessage("Cannot create output file at");
thrown.expect(RuntimeException.class);
p.run();
}
@Test
public void testWorkerHarnessContainerImage() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// default image set
options.setWorkerHarnessContainerImage("some-container");
assertThat(getContainerImageForJob(options), equalTo("some-container"));
// batch, legacy
options.setWorkerHarnessContainerImage("gcr.io/IMAGE/foo");
options.setExperiments(null);
options.setStreaming(false);
System.setProperty("java.specification.version", "1.8");
assertThat(getContainerImageForJob(options), equalTo("gcr.io/beam-java-batch/foo"));
// batch, legacy, jdk11
options.setStreaming(false);
System.setProperty("java.specification.version", "11");
assertThat(getContainerImageForJob(options), equalTo("gcr.io/beam-java11-batch/foo"));
// streaming, legacy
System.setProperty("java.specification.version", "1.8");
options.setStreaming(true);
assertThat(getContainerImageForJob(options), equalTo("gcr.io/beam-java-streaming/foo"));
// streaming, legacy, jdk11
System.setProperty("java.specification.version", "11");
assertThat(getContainerImageForJob(options), equalTo("gcr.io/beam-java11-streaming/foo"));
// streaming, fnapi
options.setExperiments(ImmutableList.of("experiment1", "beam_fn_api"));
assertThat(getContainerImageForJob(options), equalTo("gcr.io/java/foo"));
}
@Test
public void testStreamingWriteWithNoShardingReturnsNewTransform() {
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.as(DataflowPipelineWorkerPoolOptions.class).setMaxNumWorkers(10);
testStreamingWriteOverride(options, 20);
}
@Test
public void testStreamingWriteWithNoShardingReturnsNewTransformMaxWorkersUnset() {
PipelineOptions options = TestPipeline.testingPipelineOptions();
testStreamingWriteOverride(options, StreamingShardedWriteFactory.DEFAULT_NUM_SHARDS);
}
private void verifyMergingStatefulParDoRejected(PipelineOptions options) throws Exception {
Pipeline p = Pipeline.create(options);
p.apply(Create.of(KV.of(13, 42)))
.apply(Window.into(Sessions.withGapDuration(Duration.millis(1))))
.apply(
ParDo.of(
new DoFn<KV<Integer, Integer>, Void>() {
@StateId("fizzle")
private final StateSpec<ValueState<Void>> voidState = StateSpecs.value();
@ProcessElement
public void process() {}
}));
thrown.expectMessage("merging");
thrown.expect(UnsupportedOperationException.class);
p.run();
}
@Test
public void testMergingStatefulRejectedInStreaming() throws Exception {
PipelineOptions options = buildPipelineOptions();
options.as(StreamingOptions.class).setStreaming(true);
verifyMergingStatefulParDoRejected(options);
}
@Test
public void testMergingStatefulRejectedInBatch() throws Exception {
PipelineOptions options = buildPipelineOptions();
options.as(StreamingOptions.class).setStreaming(false);
verifyMergingStatefulParDoRejected(options);
}
private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) {
TestPipeline p = TestPipeline.fromOptions(options);
StreamingShardedWriteFactory<Object, Void, Object> factory =
new StreamingShardedWriteFactory<>(p.getOptions());
WriteFiles<Object, Void, Object> original = WriteFiles.to(new TestSink(tmpFolder.toString()));
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
AppliedPTransform<PCollection<Object>, WriteFilesResult<Void>, WriteFiles<Object, Void, Object>>
originalApplication =
AppliedPTransform.of("writefiles", objs.expand(), Collections.emptyMap(), original, p);
WriteFiles<Object, Void, Object> replacement =
(WriteFiles<Object, Void, Object>)
factory.getReplacementTransform(originalApplication).getTransform();
assertThat(replacement, not(equalTo((Object) original)));
assertThat(replacement.getNumShardsProvider().get(), equalTo(expectedNumShards));
WriteFilesResult<Void> originalResult = objs.apply(original);
WriteFilesResult<Void> replacementResult = objs.apply(replacement);
Map<PValue, ReplacementOutput> res =
factory.mapOutputs(originalResult.expand(), replacementResult);
assertEquals(1, res.size());
assertEquals(
originalResult.getPerDestinationOutputFilenames(),
res.get(replacementResult.getPerDestinationOutputFilenames()).getOriginal().getValue());
}
private static class TestSink extends FileBasedSink<Object, Void, Object> {
@Override
public void validate(PipelineOptions options) {}
TestSink(String tmpFolder) {
super(
StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)),
DynamicFileDestinations.constant(
new FilenamePolicy() {
@Override
public ResourceId windowedFilename(
int shardNumber,
int numShards,
BoundedWindow window,
PaneInfo paneInfo,
OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("should not be called");
}
@Nullable
@Override
public ResourceId unwindowedFilename(
int shardNumber, int numShards, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("should not be called");
}
},
SerializableFunctions.identity()));
}
@Override
public WriteOperation<Void, Object> createWriteOperation() {
return new WriteOperation<Void, Object>(this) {
@Override
public Writer<Void, Object> createWriter() {
throw new UnsupportedOperationException();
}
};
}
}
}