blob: 7e7e8e8ca4176d7eee44ad0e2061a47980bf8862 [file] [log] [blame]
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.sdk.runners;
import static com.google.cloud.dataflow.sdk.util.StringUtils.approximatePTransformName;
import static com.google.cloud.dataflow.sdk.util.StringUtils.approximateSimpleName;
import static com.google.cloud.dataflow.sdk.util.WindowedValue.valueInEmptyWindows;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.clouddebugger.v2.Clouddebugger;
import com.google.api.services.clouddebugger.v2.model.Debuggee;
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest;
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.ListJobsResponse;
import com.google.api.services.dataflow.model.WorkerPool;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
import com.google.cloud.dataflow.sdk.PipelineResult.State;
import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
import com.google.cloud.dataflow.sdk.coders.IterableCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.coders.ListCoder;
import com.google.cloud.dataflow.sdk.coders.MapCoder;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.coders.StandardCoder;
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
import com.google.cloud.dataflow.sdk.io.AvroIO;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.FileBasedSink;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.io.PubsubIO.Read.Bound.PubsubReader;
import com.google.cloud.dataflow.sdk.io.PubsubIO.Write.Bound.PubsubWriter;
import com.google.cloud.dataflow.sdk.io.PubsubUnboundedSink;
import com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.io.Write;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
import com.google.cloud.dataflow.sdk.options.StreamingOptions;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.JobSpecification;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
import com.google.cloud.dataflow.sdk.runners.dataflow.AssignWindows;
import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms;
import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowUnboundedReadFromBoundedSource;
import com.google.cloud.dataflow.sdk.runners.dataflow.ReadTranslator;
import com.google.cloud.dataflow.sdk.runners.worker.IsmFormat;
import com.google.cloud.dataflow.sdk.runners.worker.IsmFormat.IsmRecord;
import com.google.cloud.dataflow.sdk.runners.worker.IsmFormat.IsmRecordCoder;
import com.google.cloud.dataflow.sdk.runners.worker.IsmFormat.MetadataKeyCoder;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Flatten;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.transforms.View;
import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
import com.google.cloud.dataflow.sdk.transforms.WithKeys;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
import com.google.cloud.dataflow.sdk.util.MimeTypes;
import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
import com.google.cloud.dataflow.sdk.util.PCollectionViews;
import com.google.cloud.dataflow.sdk.util.PathValidator;
import com.google.cloud.dataflow.sdk.util.PropertyNames;
import com.google.cloud.dataflow.sdk.util.Reshuffle;
import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.ValueWithRecordId;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
import com.google.cloud.dataflow.sdk.values.PCollectionList;
import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.POutput;
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.cloud.dataflow.sdk.values.TupleTagList;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.base.Utf8;
import com.google.common.collect.ForwardingMap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
/**
* A {@link PipelineRunner} that executes the operations in the
* pipeline by first translating them to the Dataflow representation
* using the {@link DataflowPipelineTranslator} and then submitting
* them to a Dataflow service for execution.
*
* <p><h3>Permissions</h3>
* When reading from a Dataflow source or writing to a Dataflow sink using
* {@code DataflowPipelineRunner}, the Google cloudservices account and the Google compute engine
* service account of the GCP project running the Dataflow Job will need access to the corresponding
* source/sink.
*
* <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud
* Dataflow Security and Permissions</a> for more details.
*/
public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> {
private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineRunner.class);
/** Provided configuration options. */
private final DataflowPipelineOptions options;
/** Client for the Dataflow service. This is used to actually submit jobs. */
private final Dataflow dataflowClient;
/** Translator for this DataflowPipelineRunner, based on options. */
private final DataflowPipelineTranslator translator;
/** Custom transforms implementations. */
private final Map<Class<?>, Class<?>> overrides;
/** A set of user defined functions to invoke at different points in execution. */
private DataflowPipelineRunnerHooks hooks;
// Environment version information.
private static final String ENVIRONMENT_MAJOR_VERSION = "6";
// Default Docker container images that execute Dataflow worker harness, residing in Google
// Container Registry, separately for Batch and Streaming.
public static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE
= "dataflow.gcr.io/v1beta3/java-batch:1.8.1";
public static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE
= "dataflow.gcr.io/v1beta3/java-streaming:1.8.1";
// The limit of CreateJob request size.
private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024;
@VisibleForTesting
static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 1 * 1024 * 1024;
private final Set<PCollection<?>> pcollectionsRequiringIndexedFormat;
/**
* Project IDs must contain lowercase letters, digits, or dashes.
* IDs must start with a letter and may not end with a dash.
* This regex isn't exact - this allows for patterns that would be rejected by
* the service, but this is sufficient for basic validation of project IDs.
*/
public static final String PROJECT_ID_REGEXP = "[a-z][-a-z0-9:.]+[a-z0-9]";
private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
/**
* Construct a runner from the provided options.
*
* @param options Properties that configure the runner.
* @return The newly created runner.
*/
public static DataflowPipelineRunner fromOptions(PipelineOptions options) {
// (Re-)register standard IO factories. Clobbers any prior credentials.
IOChannelUtils.registerStandardIOFactories(options);
DataflowPipelineOptions dataflowOptions =
PipelineOptionsValidator.validate(DataflowPipelineOptions.class, options);
ArrayList<String> missing = new ArrayList<>();
if (dataflowOptions.getAppName() == null) {
missing.add("appName");
}
if (missing.size() > 0) {
throw new IllegalArgumentException(
"Missing required values: " + Joiner.on(',').join(missing));
}
PathValidator validator = dataflowOptions.getPathValidator();
checkArgument(!(Strings.isNullOrEmpty(dataflowOptions.getTempLocation())
&& Strings.isNullOrEmpty(dataflowOptions.getStagingLocation())),
"Missing required value: at least one of tempLocation or stagingLocation must be set.");
if (dataflowOptions.getStagingLocation() != null) {
validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation());
}
if (dataflowOptions.getTempLocation() != null) {
validator.validateOutputFilePrefixSupported(dataflowOptions.getTempLocation());
}
if (Strings.isNullOrEmpty(dataflowOptions.getTempLocation())) {
dataflowOptions.setTempLocation(dataflowOptions.getStagingLocation());
} else if (Strings.isNullOrEmpty(dataflowOptions.getStagingLocation())) {
try {
dataflowOptions.setStagingLocation(
IOChannelUtils.resolve(dataflowOptions.getTempLocation(), "staging"));
} catch (IOException e) {
throw new IllegalArgumentException("Unable to resolve PipelineOptions.stagingLocation "
+ "from PipelineOptions.tempLocation. Please set the staging location explicitly.", e);
}
}
if (dataflowOptions.getFilesToStage() == null) {
dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(
DataflowPipelineRunner.class.getClassLoader()));
LOG.info("PipelineOptions.filesToStage was not specified. "
+ "Defaulting to files from the classpath: will stage {} files. "
+ "Enable logging at DEBUG level to see which files will be staged.",
dataflowOptions.getFilesToStage().size());
LOG.debug("Classpath elements: {}", dataflowOptions.getFilesToStage());
}
// Verify jobName according to service requirements, truncating converting to lowercase if
// necessary.
String jobName =
dataflowOptions
.getJobName()
.toLowerCase();
checkArgument(
jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"),
"JobName invalid; the name must consist of only the characters "
+ "[-a-z0-9], starting with a letter and ending with a letter "
+ "or number");
if (!jobName.equals(dataflowOptions.getJobName())) {
LOG.info(
"PipelineOptions.jobName did not match the service requirements. "
+ "Using {} instead of {}.",
jobName,
dataflowOptions.getJobName());
}
dataflowOptions.setJobName(jobName);
// Verify project
String project = dataflowOptions.getProject();
if (project.matches("[0-9]*")) {
throw new IllegalArgumentException("Project ID '" + project
+ "' invalid. Please make sure you specified the Project ID, not project number.");
} else if (!project.matches(PROJECT_ID_REGEXP)) {
throw new IllegalArgumentException("Project ID '" + project
+ "' invalid. Please make sure you specified the Project ID, not project description.");
}
DataflowPipelineDebugOptions debugOptions =
dataflowOptions.as(DataflowPipelineDebugOptions.class);
// Verify the number of worker threads is a valid value
if (debugOptions.getNumberOfWorkerHarnessThreads() < 0) {
throw new IllegalArgumentException("Number of worker harness threads '"
+ debugOptions.getNumberOfWorkerHarnessThreads()
+ "' invalid. Please make sure the value is non-negative.");
}
if (dataflowOptions.isStreaming() && dataflowOptions.getGcsUploadBufferSizeBytes() == null) {
dataflowOptions.setGcsUploadBufferSizeBytes(GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT);
}
return new DataflowPipelineRunner(dataflowOptions);
}
@VisibleForTesting protected DataflowPipelineRunner(DataflowPipelineOptions options) {
this.options = options;
this.dataflowClient = options.getDataflowClient();
this.translator = DataflowPipelineTranslator.fromOptions(options);
this.pcollectionsRequiringIndexedFormat = new HashSet<>();
this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, Class<?>>builder();
if (options.isStreaming()) {
builder.put(Combine.GloballyAsSingletonView.class,
StreamingCombineGloballyAsSingletonView.class);
builder.put(Create.Values.class, StreamingCreate.class);
builder.put(View.AsMap.class, StreamingViewAsMap.class);
builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class);
builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
builder.put(View.AsList.class, StreamingViewAsList.class);
builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
builder.put(Read.Bounded.class, StreamingBoundedRead.class);
builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class);
builder.put(Window.Bound.class, AssignWindows.class);
// In streaming mode must use either the custom Pubsub unbounded source/sink or
// defer to Windmill's built-in implementation.
builder.put(PubsubReader.class, UnsupportedIO.class);
builder.put(PubsubWriter.class, UnsupportedIO.class);
if (options.getExperiments() == null
|| !options.getExperiments().contains("enable_custom_pubsub_sink")) {
builder.put(PubsubIO.Write.Bound.class, StreamingPubsubIOWrite.class);
}
} else {
builder.put(Read.Unbounded.class, UnsupportedIO.class);
builder.put(Window.Bound.class, AssignWindows.class);
builder.put(Write.Bound.class, BatchWrite.class);
// In batch mode must use the custom Pubsub bounded source/sink.
builder.put(PubsubUnboundedSource.class, UnsupportedIO.class);
builder.put(PubsubUnboundedSink.class, UnsupportedIO.class);
if (options.getExperiments() == null
|| !options.getExperiments().contains("disable_ism_side_input")) {
builder.put(View.AsMap.class, BatchViewAsMap.class);
builder.put(View.AsMultimap.class, BatchViewAsMultimap.class);
builder.put(View.AsSingleton.class, BatchViewAsSingleton.class);
builder.put(View.AsList.class, BatchViewAsList.class);
builder.put(View.AsIterable.class, BatchViewAsIterable.class);
}
if (options.getExperiments() == null
|| !options.getExperiments().contains("enable_custom_bigquery_source")) {
builder.put(BigQueryIO.Read.Bound.class, BatchBigQueryIONativeRead.class);
}
if (options.getExperiments() == null
|| !options.getExperiments().contains("enable_custom_bigquery_sink")) {
builder.put(BigQueryIO.Write.Bound.class, BatchBigQueryIOWrite.class);
}
}
overrides = builder.build();
}
/**
* Applies the given transform to the input. For transforms with customized definitions
* for the Dataflow pipeline runner, the application is intercepted and modified here.
*/
@Override
public <OutputT extends POutput, InputT extends PInput> OutputT apply(
PTransform<InputT, OutputT> transform, InputT input) {
if (Combine.GroupedValues.class.equals(transform.getClass())
|| GroupByKey.class.equals(transform.getClass())) {
// For both Dataflow runners (streaming and batch), GroupByKey and GroupedValues are
// primitives. Returning a primitive output instead of the expanded definition
// signals to the translator that translation is necessary.
@SuppressWarnings("unchecked")
PCollection<?> pc = (PCollection<?>) input;
@SuppressWarnings("unchecked")
OutputT outputT = (OutputT) PCollection.createPrimitiveOutputInternal(
pc.getPipeline(),
transform instanceof GroupByKey
? ((GroupByKey<?, ?>) transform).updateWindowingStrategy(pc.getWindowingStrategy())
: pc.getWindowingStrategy(),
pc.isBounded());
return outputT;
} else if (PubsubIO.Read.Bound.class.equals(transform.getClass())
&& options.isStreaming()
&& (options.getExperiments() == null
|| !options.getExperiments().contains("enable_custom_pubsub_source"))) {
// casting to wildcard
@SuppressWarnings("unchecked")
OutputT pubsub = (OutputT) applyPubsubStreamingRead((PubsubIO.Read.Bound<?>) transform,
input);
return pubsub;
} else if (Window.Bound.class.equals(transform.getClass())) {
/*
* TODO: make this the generic way overrides are applied (using super.apply() rather than
* Pipeline.applyTransform(); this allows the apply method to be replaced without inserting
* additional nodes into the graph.
*/
// casting to wildcard
@SuppressWarnings("unchecked")
OutputT windowed = (OutputT) applyWindow((Window.Bound<?>) transform, (PCollection<?>) input);
return windowed;
} else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
&& ((PCollectionList<?>) input).size() == 0) {
return (OutputT) Pipeline.applyTransform(input, Create.of());
} else if (overrides.containsKey(transform.getClass())) {
// It is the responsibility of whoever constructs overrides to ensure this is type safe.
@SuppressWarnings("unchecked")
Class<PTransform<InputT, OutputT>> transformClass =
(Class<PTransform<InputT, OutputT>>) transform.getClass();
@SuppressWarnings("unchecked")
Class<PTransform<InputT, OutputT>> customTransformClass =
(Class<PTransform<InputT, OutputT>>) overrides.get(transform.getClass());
PTransform<InputT, OutputT> customTransform =
InstanceBuilder.ofType(customTransformClass)
.withArg(DataflowPipelineRunner.class, this)
.withArg(transformClass, transform)
.build();
return Pipeline.applyTransform(input, customTransform);
} else {
return super.apply(transform, input);
}
}
private <T> PCollection<T>
applyPubsubStreamingRead(PubsubIO.Read.Bound<?> initialTransform, PInput input) {
// types are matched at compile time
@SuppressWarnings("unchecked")
PubsubIO.Read.Bound<T> transform = (PubsubIO.Read.Bound<T>) initialTransform;
return PCollection.<T>createPrimitiveOutputInternal(
input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
.setCoder(transform.getCoder());
}
private <T> PCollection<T> applyWindow(
Window.Bound<?> intitialTransform, PCollection<?> initialInput) {
// types are matched at compile time
@SuppressWarnings("unchecked")
Window.Bound<T> transform = (Window.Bound<T>) intitialTransform;
@SuppressWarnings("unchecked")
PCollection<T> input = (PCollection<T>) initialInput;
return super.apply(new AssignWindows<>(transform), input);
}
private String debuggerMessage(String projectId, String uniquifier) {
return String.format("To debug your job, visit Google Cloud Debugger at: "
+ "https://console.developers.google.com/debug?project=%s&dbgee=%s",
projectId, uniquifier);
}
private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniquifier) {
if (!options.getEnableCloudDebugger()) {
return;
}
if (options.getDebuggee() != null) {
throw new RuntimeException("Should not specify the debuggee");
}
Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build();
Debuggee debuggee = registerDebuggee(debuggerClient, uniquifier);
options.setDebuggee(debuggee);
System.out.println(debuggerMessage(options.getProject(), debuggee.getUniquifier()));
}
private Debuggee registerDebuggee(Clouddebugger debuggerClient, String uniquifier) {
RegisterDebuggeeRequest registerReq = new RegisterDebuggeeRequest();
registerReq.setDebuggee(new Debuggee()
.setProject(options.getProject())
.setUniquifier(uniquifier)
.setDescription(uniquifier)
.setAgentVersion("google.com/cloud-dataflow-java/v1"));
try {
RegisterDebuggeeResponse registerResponse =
debuggerClient.controller().debuggees().register(registerReq).execute();
Debuggee debuggee = registerResponse.getDebuggee();
if (debuggee.getStatus() != null && debuggee.getStatus().getIsError()) {
throw new RuntimeException("Unable to register with the debugger: "
+ debuggee.getStatus().getDescription().getFormat());
}
return debuggee;
} catch (IOException e) {
throw new RuntimeException("Unable to register with the debugger: ", e);
}
}
@Override
public DataflowPipelineJob run(Pipeline pipeline) {
logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
LOG.info("Executing pipeline on the Dataflow Service, which will have billing implications "
+ "related to Google Compute Engine usage and other Google Cloud Services.");
List<DataflowPackage> packages = options.getStager().stageFiles();
// Set a unique client_request_id in the CreateJob request.
// This is used to ensure idempotence of job creation across retried
// attempts to create a job. Specifically, if the service returns a job with
// a different client_request_id, it means the returned one is a different
// job previously created with the same job name, and that the job creation
// has been effectively rejected. The SDK should return
// Error::Already_Exists to user in that case.
int randomNum = new Random().nextInt(9000) + 1000;
String requestId = DateTimeFormat.forPattern("YYYYMMddHHmmssmmm").withZone(DateTimeZone.UTC)
.print(DateTimeUtils.currentTimeMillis()) + "_" + randomNum;
// Try to create a debuggee ID. This must happen before the job is translated since it may
// update the options.
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
maybeRegisterDebuggee(dataflowOptions, requestId);
JobSpecification jobSpecification =
translator.translate(pipeline, this, packages);
Job newJob = jobSpecification.getJob();
newJob.setClientRequestId(requestId);
String version = DataflowReleaseInfo.getReleaseInfo().getVersion();
System.out.println("Dataflow SDK version: " + version);
newJob.getEnvironment().setUserAgent(DataflowReleaseInfo.getReleaseInfo());
// The Dataflow Service may write to the temporary directory directly, so
// must be verified.
if (!Strings.isNullOrEmpty(options.getTempLocation())) {
newJob.getEnvironment().setTempStoragePrefix(
dataflowOptions.getPathValidator().verifyPath(options.getTempLocation()));
}
newJob.getEnvironment().setDataset(options.getTempDatasetId());
newJob.getEnvironment().setExperiments(options.getExperiments());
// Set the Docker container image that executes Dataflow worker harness, residing in Google
// Container Registry. Translator is guaranteed to create a worker pool prior to this point.
String workerHarnessContainerImage =
options.as(DataflowPipelineWorkerPoolOptions.class)
.getWorkerHarnessContainerImage();
for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage);
}
// Requirements about the service.
Map<String, Object> environmentVersion = new HashMap<>();
environmentVersion.put(PropertyNames.ENVIRONMENT_VERSION_MAJOR_KEY, ENVIRONMENT_MAJOR_VERSION);
newJob.getEnvironment().setVersion(environmentVersion);
// Default jobType is JAVA_BATCH_AUTOSCALING: A Java job with workers that the job can
// autoscale if specified.
String jobType = "JAVA_BATCH_AUTOSCALING";
if (options.isStreaming()) {
jobType = "STREAMING";
}
environmentVersion.put(PropertyNames.ENVIRONMENT_VERSION_JOB_TYPE_KEY, jobType);
if (hooks != null) {
hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment());
}
if (!Strings.isNullOrEmpty(options.getDataflowJobFile())) {
runJobFileHooks(newJob);
}
if (hooks != null && !hooks.shouldActuallyRunJob()) {
return null;
}
String jobIdToUpdate = null;
if (options.getUpdate()) {
jobIdToUpdate = getJobIdFromName(options.getJobName());
newJob.setTransformNameMapping(options.getTransformNameMapping());
newJob.setReplaceJobId(jobIdToUpdate);
}
Job jobResult;
try {
jobResult = dataflowClient
.projects()
.jobs()
.create(options.getProject(), newJob)
.execute();
} catch (GoogleJsonResponseException e) {
String errorMessages = "Unexpected errors";
if (e.getDetails() != null) {
if (Utf8.encodedLength(newJob.toString()) >= CREATE_JOB_REQUEST_LIMIT_BYTES) {
errorMessages = "The size of the serialized JSON representation of the pipeline "
+ "exceeds the allowable limit. "
+ "For more information, please check the FAQ link below:\n"
+ "https://cloud.google.com/dataflow/faq";
} else {
errorMessages = e.getDetails().getMessage();
}
}
throw new RuntimeException("Failed to create a workflow job: " + errorMessages, e);
} catch (IOException e) {
throw new RuntimeException("Failed to create a workflow job", e);
}
// Obtain all of the extractors from the PTransforms used in the pipeline so the
// DataflowPipelineJob has access to them.
AggregatorPipelineExtractor aggregatorExtractor = new AggregatorPipelineExtractor(pipeline);
Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
aggregatorExtractor.getAggregatorSteps();
DataflowAggregatorTransforms aggregatorTransforms =
new DataflowAggregatorTransforms(aggregatorSteps, jobSpecification.getStepNames());
// Use a raw client for post-launch monitoring, as status calls may fail
// regularly and need not be retried automatically.
DataflowPipelineJob dataflowPipelineJob =
new DataflowPipelineJob(options.getProject(), jobResult.getId(),
Transport.newRawDataflowClient(options).build(), aggregatorTransforms);
// If the service returned client request id, the SDK needs to compare it
// with the original id generated in the request, if they are not the same
// (i.e., the returned job is not created by this request), throw
// DataflowJobAlreadyExistsException or DataflowJobAlreadyUpdatedExcetpion
// depending on whether this is a reload or not.
if (jobResult.getClientRequestId() != null && !jobResult.getClientRequestId().isEmpty()
&& !jobResult.getClientRequestId().equals(requestId)) {
// If updating a job.
if (options.getUpdate()) {
throw new DataflowJobAlreadyUpdatedException(dataflowPipelineJob,
String.format("The job named %s with id: %s has already been updated into job id: %s "
+ "and cannot be updated again.",
newJob.getName(), jobIdToUpdate, jobResult.getId()));
} else {
throw new DataflowJobAlreadyExistsException(dataflowPipelineJob,
String.format("There is already an active job named %s with id: %s. If you want "
+ "to submit a second job, try again by setting a different name using --jobName.",
newJob.getName(), jobResult.getId()));
}
}
LOG.info("To access the Dataflow monitoring console, please navigate to {}",
MonitoringUtil.getJobMonitoringPageURL(options.getProject(), jobResult.getId()));
System.out.println("Submitted job: " + jobResult.getId());
LOG.info("To cancel the job using the 'gcloud' tool, run:\n> {}",
MonitoringUtil.getGcloudCancelCommand(options, jobResult.getId()));
return dataflowPipelineJob;
}
/**
* Returns the DataflowPipelineTranslator associated with this object.
*/
public DataflowPipelineTranslator getTranslator() {
return translator;
}
/**
* Sets callbacks to invoke during execution see {@code DataflowPipelineRunnerHooks}.
*/
@Experimental
public void setHooks(DataflowPipelineRunnerHooks hooks) {
this.hooks = hooks;
}
/////////////////////////////////////////////////////////////////////////////
/** Outputs a warning about PCollection views without deterministic key coders. */
private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline) {
// We need to wait till this point to determine the names of the transforms since only
// at this time do we know the hierarchy of the transforms otherwise we could
// have just recorded the full names during apply time.
if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>();
pipeline.traverseTopologically(new PipelineVisitor() {
@Override
public void visitValue(PValue value, TransformTreeNode producer) {
}
@Override
public void visitTransform(TransformTreeNode node) {
if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
}
}
@Override
public void enterCompositeTransform(TransformTreeNode node) {
if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
}
}
@Override
public void leaveCompositeTransform(TransformTreeNode node) {
}
});
LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} "
+ "because the key coder is not deterministic. Falling back to singleton implementation "
+ "which may cause memory and/or performance problems. Future major versions of "
+ "Dataflow will require deterministic key coders.",
ptransformViewNamesWithNonDeterministicKeyCoders);
}
}
private void runJobFileHooks(Job newJob) {
try {
WritableByteChannel writer =
IOChannelUtils.create(options.getDataflowJobFile(), MimeTypes.TEXT);
PrintWriter printWriter = new PrintWriter(Channels.newOutputStream(writer));
String workSpecJson = DataflowPipelineTranslator.jobToString(newJob);
printWriter.print(workSpecJson);
printWriter.flush();
printWriter.close();
LOG.info("Printed job specification to {}", options.getDataflowJobFile());
} catch (IllegalStateException ex) {
String error = "Cannot translate workflow spec to JSON.";
if (hooks != null && hooks.failOnJobFileWriteFailure()) {
throw new RuntimeException(error, ex);
} else {
LOG.warn(error, ex);
}
} catch (IOException ex) {
String error =
String.format("Cannot create output file at {}", options.getDataflowJobFile());
if (hooks != null && hooks.failOnJobFileWriteFailure()) {
throw new RuntimeException(error, ex);
} else {
LOG.warn(error, ex);
}
}
}
/**
* Returns true if the passed in {@link PCollection} needs to be materialiazed using
* an indexed format.
*/
boolean doesPCollectionRequireIndexedFormat(PCollection<?> pcol) {
return pcollectionsRequiringIndexedFormat.contains(pcol);
}
/**
* Marks the passed in {@link PCollection} as requiring to be materialized using
* an indexed format.
*/
private void addPCollectionRequiringIndexedFormat(PCollection<?> pcol) {
pcollectionsRequiringIndexedFormat.add(pcol);
}
/** A set of {@link View}s with non-deterministic key coders. */
Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders;
/**
* Records that the {@link PTransform} requires a deterministic key coder.
*/
private void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
ptransformViewsWithNonDeterministicKeyCoders.add(ptransform);
}
/**
* A {@link GroupByKey} transform for the {@link DataflowPipelineRunner} which sorts
* values using the secondary key {@code K2}.
*
* <p>The {@link PCollection} created created by this {@link PTransform} will have values in
* the empty window. Care must be taken *afterwards* to either re-window
* (using {@link Window#into}) or only use {@link PTransform}s that do not depend on the
* values being within a window.
*/
static class GroupByKeyAndSortValuesOnly<K1, K2, V>
extends PTransform<PCollection<KV<K1, KV<K2, V>>>, PCollection<KV<K1, Iterable<KV<K2, V>>>>> {
private GroupByKeyAndSortValuesOnly() {
}
@Override
public PCollection<KV<K1, Iterable<KV<K2, V>>>> apply(PCollection<KV<K1, KV<K2, V>>> input) {
PCollection<KV<K1, Iterable<KV<K2, V>>>> rval =
PCollection.<KV<K1, Iterable<KV<K2, V>>>>createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
IsBounded.BOUNDED);
@SuppressWarnings({"unchecked", "rawtypes"})
KvCoder<K1, KV<K2, V>> inputCoder = (KvCoder) input.getCoder();
rval.setCoder(
KvCoder.of(inputCoder.getKeyCoder(),
IterableCoder.of(inputCoder.getValueCoder())));
return rval;
}
}
/**
* A {@link PTransform} that groups the values by a hash of the window's byte representation
* and sorts the values using the windows byte representation.
*/
private static class GroupByWindowHashAsKeyAndWindowAsSortKey<T, W extends BoundedWindow> extends
PTransform<PCollection<T>, PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>>> {
/**
* A {@link DoFn} that for each element outputs a {@code KV} structure suitable for
* grouping by the hash of the window's byte representation and sorting the grouped values
* using the window's byte representation.
*/
@SystemDoFnInternal
private static class UseWindowHashAsKeyAndWindowAsSortKeyDoFn<T, W extends BoundedWindow>
extends DoFn<T, KV<Integer, KV<W, WindowedValue<T>>>> implements DoFn.RequiresWindowAccess {
private final IsmRecordCoder<?> ismCoderForHash;
private UseWindowHashAsKeyAndWindowAsSortKeyDoFn(IsmRecordCoder<?> ismCoderForHash) {
this.ismCoderForHash = ismCoderForHash;
}
@Override
public void processElement(ProcessContext c) throws Exception {
@SuppressWarnings("unchecked")
W window = (W) c.window();
c.output(
KV.of(ismCoderForHash.hash(ImmutableList.of(window)),
KV.of(window,
WindowedValue.of(
c.element(),
c.timestamp(),
c.window(),
c.pane()))));
}
}
private final IsmRecordCoder<?> ismCoderForHash;
private GroupByWindowHashAsKeyAndWindowAsSortKey(IsmRecordCoder<?> ismCoderForHash) {
this.ismCoderForHash = ismCoderForHash;
}
@Override
public PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>> apply(PCollection<T> input) {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>)
input.getWindowingStrategy().getWindowFn().windowCoder();
PCollection<KV<Integer, KV<W, WindowedValue<T>>>> rval =
input.apply(ParDo.of(
new UseWindowHashAsKeyAndWindowAsSortKeyDoFn<T, W>(ismCoderForHash)));
rval.setCoder(
KvCoder.of(
VarIntCoder.of(),
KvCoder.of(windowCoder,
FullWindowedValueCoder.of(input.getCoder(), windowCoder))));
return rval.apply(new GroupByKeyAndSortValuesOnly<Integer, W, WindowedValue<T>>());
}
}
/**
* Specialized implementation for
* {@link com.google.cloud.dataflow.sdk.transforms.View.AsSingleton View.AsSingleton} for the
* Dataflow runner in batch mode.
*
* <p>Creates a set of files in the {@link IsmFormat} sharded by the hash of the windows
* byte representation and with records having:
* <ul>
* <li>Key 1: Window</li>
* <li>Value: Windowed value</li>
* </ul>
*/
static class BatchViewAsSingleton<T>
extends PTransform<PCollection<T>, PCollectionView<T>> {
/**
* A {@link DoFn} that outputs {@link IsmRecord}s. These records are structured as follows:
* <ul>
* <li>Key 1: Window
* <li>Value: Windowed value
* </ul>
*/
static class IsmRecordForSingularValuePerWindowDoFn<T, W extends BoundedWindow>
extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
IsmRecord<WindowedValue<T>>> {
private final Coder<W> windowCoder;
IsmRecordForSingularValuePerWindowDoFn(Coder<W> windowCoder) {
this.windowCoder = windowCoder;
}
@Override
public void processElement(ProcessContext c) throws Exception {
Optional<Object> previousWindowStructuralValue = Optional.absent();
T previousValue = null;
Iterator<KV<W, WindowedValue<T>>> iterator = c.element().getValue().iterator();
while (iterator.hasNext()) {
KV<W, WindowedValue<T>> next = iterator.next();
Object currentWindowStructuralValue = windowCoder.structuralValue(next.getKey());
// Verify that the user isn't trying to have more than one element per window as
// a singleton.
checkState(!previousWindowStructuralValue.isPresent()
|| !previousWindowStructuralValue.get().equals(currentWindowStructuralValue),
"Multiple values [%s, %s] found for singleton within window [%s].",
previousValue,
next.getValue().getValue(),
next.getKey());
c.output(
IsmRecord.of(
ImmutableList.of(next.getKey()), next.getValue()));
previousWindowStructuralValue = Optional.of(currentWindowStructuralValue);
previousValue = next.getValue().getValue();
}
}
}
private final DataflowPipelineRunner runner;
private final View.AsSingleton<T> transform;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public BatchViewAsSingleton(DataflowPipelineRunner runner, View.AsSingleton<T> transform) {
this.runner = runner;
this.transform = transform;
}
@Override
public PCollectionView<T> apply(PCollection<T> input) {
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>)
input.getWindowingStrategy().getWindowFn().windowCoder();
return BatchViewAsSingleton.<T, T, T, BoundedWindow>applyForSingleton(
runner,
input,
new IsmRecordForSingularValuePerWindowDoFn<T, BoundedWindow>(windowCoder),
transform.hasDefaultValue(),
transform.defaultValue(),
input.getCoder());
}
static <T, FinalT, ViewT, W extends BoundedWindow> PCollectionView<ViewT>
applyForSingleton(
DataflowPipelineRunner runner,
PCollection<T> input,
DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
IsmRecord<WindowedValue<FinalT>>> doFn,
boolean hasDefault,
FinalT defaultValue,
Coder<FinalT> defaultValueCoder) {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>)
input.getWindowingStrategy().getWindowFn().windowCoder();
@SuppressWarnings({"rawtypes", "unchecked"})
PCollectionView<ViewT> view = PCollectionViews.singletonView(
input.getPipeline(),
(WindowingStrategy) input.getWindowingStrategy(),
hasDefault,
defaultValue,
defaultValueCoder);
IsmRecordCoder<WindowedValue<FinalT>> ismCoder =
coderForSingleton(windowCoder, defaultValueCoder);
PCollection<IsmRecord<WindowedValue<FinalT>>> reifiedPerWindowAndSorted = input
.apply(new GroupByWindowHashAsKeyAndWindowAsSortKey<T, W>(ismCoder))
.apply(ParDo.of(doFn));
reifiedPerWindowAndSorted.setCoder(ismCoder);
runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
return reifiedPerWindowAndSorted.apply(
CreatePCollectionView.<IsmRecord<WindowedValue<FinalT>>, ViewT>of(view));
}
@Override
protected String getKindString() {
return "BatchViewAsSingleton";
}
static <T> IsmRecordCoder<WindowedValue<T>> coderForSingleton(
Coder<? extends BoundedWindow> windowCoder, Coder<T> valueCoder) {
return IsmRecordCoder.of(
1, // We hash using only the window
0, // There are no metadata records
ImmutableList.<Coder<?>>of(windowCoder),
FullWindowedValueCoder.of(valueCoder, windowCoder));
}
}
/**
* Specialized implementation for
* {@link com.google.cloud.dataflow.sdk.transforms.View.AsIterable View.AsIterable} for the
* Dataflow runner in batch mode.
*
* <p>Creates a set of {@code Ism} files sharded by the hash of the windows byte representation
* and with records having:
* <ul>
* <li>Key 1: Window</li>
* <li>Key 2: Index offset within window</li>
* <li>Value: Windowed value</li>
* </ul>
*/
static class BatchViewAsIterable<T>
extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
private final DataflowPipelineRunner runner;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public BatchViewAsIterable(DataflowPipelineRunner runner, View.AsIterable<T> transform) {
this.runner = runner;
}
@Override
public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(
input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
return BatchViewAsList.applyForIterableLike(runner, input, view);
}
}
/**
* Specialized implementation for
* {@link com.google.cloud.dataflow.sdk.transforms.View.AsList View.AsList} for the
* Dataflow runner in batch mode.
*
* <p>Creates a set of {@code Ism} files sharded by the hash of the window's byte representation
* and with records having:
* <ul>
* <li>Key 1: Window</li>
* <li>Key 2: Index offset within window</li>
* <li>Value: Windowed value</li>
* </ul>
*/
static class BatchViewAsList<T>
extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
/**
* A {@link DoFn} which creates {@link IsmRecord}s assuming that each element is within the
* global window. Each {@link IsmRecord} has
* <ul>
* <li>Key 1: Global window</li>
* <li>Key 2: Index offset within window</li>
* <li>Value: Windowed value</li>
* </ul>
*/
@SystemDoFnInternal
static class ToIsmRecordForGlobalWindowDoFn<T>
extends DoFn<T, IsmRecord<WindowedValue<T>>> {
long indexInBundle;
@Override
public void startBundle(Context c) throws Exception {
indexInBundle = 0;
}
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(IsmRecord.of(
ImmutableList.of(GlobalWindow.INSTANCE, indexInBundle),
WindowedValue.of(
c.element(),
c.timestamp(),
GlobalWindow.INSTANCE,
c.pane())));
indexInBundle += 1;
}
}
/**
* A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows
* to locate the window boundaries. The {@link IsmRecord} has:
* <ul>
* <li>Key 1: Window</li>
* <li>Key 2: Index offset within window</li>
* <li>Value: Windowed value</li>
* </ul>
*/
@SystemDoFnInternal
static class ToIsmRecordForNonGlobalWindowDoFn<T, W extends BoundedWindow>
extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
IsmRecord<WindowedValue<T>>> {
private final Coder<W> windowCoder;
ToIsmRecordForNonGlobalWindowDoFn(Coder<W> windowCoder) {
this.windowCoder = windowCoder;
}
@Override
public void processElement(ProcessContext c) throws Exception {
long elementsInWindow = 0;
Optional<Object> previousWindowStructuralValue = Optional.absent();
for (KV<W, WindowedValue<T>> value : c.element().getValue()) {
Object currentWindowStructuralValue = windowCoder.structuralValue(value.getKey());
// Compare to see if this is a new window so we can reset the index counter i
if (previousWindowStructuralValue.isPresent()
&& !previousWindowStructuralValue.get().equals(currentWindowStructuralValue)) {
// Reset i since we have a new window.
elementsInWindow = 0;
}
c.output(IsmRecord.of(
ImmutableList.of(value.getKey(), elementsInWindow),
value.getValue()));
previousWindowStructuralValue = Optional.of(currentWindowStructuralValue);
elementsInWindow += 1;
}
}
}
private final DataflowPipelineRunner runner;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public BatchViewAsList(DataflowPipelineRunner runner, View.AsList<T> transform) {
this.runner = runner;
}
@Override
public PCollectionView<List<T>> apply(PCollection<T> input) {
PCollectionView<List<T>> view = PCollectionViews.listView(
input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
return applyForIterableLike(runner, input, view);
}
static <T, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForIterableLike(
DataflowPipelineRunner runner,
PCollection<T> input,
PCollectionView<ViewT> view) {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>)
input.getWindowingStrategy().getWindowFn().windowCoder();
IsmRecordCoder<WindowedValue<T>> ismCoder = coderForListLike(windowCoder, input.getCoder());
// If we are working in the global window, we do not need to do a GBK using the window
// as the key since all the elements of the input PCollection are already such.
// We just reify the windowed value while converting them to IsmRecords and generating
// an index based upon where we are within the bundle. Each bundle
// maps to one file exactly.
if (input.getWindowingStrategy().getWindowFn() instanceof GlobalWindows) {
PCollection<IsmRecord<WindowedValue<T>>> reifiedPerWindowAndSorted =
input.apply(ParDo.of(new ToIsmRecordForGlobalWindowDoFn<T>()));
reifiedPerWindowAndSorted.setCoder(ismCoder);
runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
return reifiedPerWindowAndSorted.apply(
CreatePCollectionView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
}
PCollection<IsmRecord<WindowedValue<T>>> reifiedPerWindowAndSorted = input
.apply(new GroupByWindowHashAsKeyAndWindowAsSortKey<T, W>(ismCoder))
.apply(ParDo.of(new ToIsmRecordForNonGlobalWindowDoFn<T, W>(windowCoder)));
reifiedPerWindowAndSorted.setCoder(ismCoder);
runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
return reifiedPerWindowAndSorted.apply(
CreatePCollectionView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
}
@Override
protected String getKindString() {
return "BatchViewAsList";
}
static <T> IsmRecordCoder<WindowedValue<T>> coderForListLike(
Coder<? extends BoundedWindow> windowCoder, Coder<T> valueCoder) {
// TODO: swap to use a variable length long coder which has values which compare
// the same as their byte representation compare lexicographically within the key coder
return IsmRecordCoder.of(
1, // We hash using only the window
0, // There are no metadata records
ImmutableList.of(windowCoder, BigEndianLongCoder.of()),
FullWindowedValueCoder.of(valueCoder, windowCoder));
}
}
/**
* Specialized implementation for
* {@link com.google.cloud.dataflow.sdk.transforms.View.AsMap View.AsMap} for the
* Dataflow runner in batch mode.
*
* <p>Creates a set of {@code Ism} files sharded by the hash of the key's byte
* representation. Each record is structured as follows:
* <ul>
* <li>Key 1: User key K</li>
* <li>Key 2: Window</li>
* <li>Key 3: 0L (constant)</li>
* <li>Value: Windowed value</li>
* </ul>
*
* <p>Alongside the data records, there are the following metadata records:
* <ul>
* <li>Key 1: Metadata Key</li>
* <li>Key 2: Window</li>
* <li>Key 3: Index [0, size of map]</li>
* <li>Value: variable length long byte representation of size of map if index is 0,
* otherwise the byte representation of a key</li>
* </ul>
* The {@code [META, Window, 0]} record stores the number of unique keys per window, while
* {@code [META, Window, i]} for {@code i} in {@code [1, size of map]} stores a the users key.
* This allows for one to access the size of the map by looking at {@code [META, Window, 0]}
* and iterate over all the keys by accessing {@code [META, Window, i]} for {@code i} in
* {@code [1, size of map]}.
*
* <p>Note that in the case of a non-deterministic key coder, we fallback to using
* {@link com.google.cloud.dataflow.sdk.transforms.View.AsSingleton View.AsSingleton} printing
* a warning to users to specify a deterministic key coder.
*/
static class BatchViewAsMap<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
/**
* A {@link DoFn} which groups elements by window boundaries. For each group,
* the group of elements is transformed into a {@link TransformedMap}.
* The transformed {@code Map<K, V>} is backed by a {@code Map<K, WindowedValue<V>>}
* and contains a function {@code WindowedValue<V> -> V}.
*
* <p>Outputs {@link IsmRecord}s having:
* <ul>
* <li>Key 1: Window</li>
* <li>Value: Transformed map containing a transform that removes the encapsulation
* of the window around each value,
* {@code Map<K, WindowedValue<V>> -> Map<K, V>}.</li>
* </ul>
*/
static class ToMapDoFn<K, V, W extends BoundedWindow>
extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
IsmRecord<WindowedValue<TransformedMap<K,
WindowedValue<V>,
V>>>> {
private final Coder<W> windowCoder;
ToMapDoFn(Coder<W> windowCoder) {
this.windowCoder = windowCoder;
}
@Override
public void processElement(ProcessContext c)
throws Exception {
Optional<Object> previousWindowStructuralValue = Optional.absent();
Optional<W> previousWindow = Optional.absent();
Map<K, WindowedValue<V>> map = new HashMap<>();
for (KV<W, WindowedValue<KV<K, V>>> kv : c.element().getValue()) {
Object currentWindowStructuralValue = windowCoder.structuralValue(kv.getKey());
if (previousWindowStructuralValue.isPresent()
&& !previousWindowStructuralValue.get().equals(currentWindowStructuralValue)) {
// Construct the transformed map containing all the elements since we
// are at a window boundary.
c.output(IsmRecord.of(
ImmutableList.of(previousWindow.get()),
valueInEmptyWindows(new TransformedMap<>(WindowedValueToValue.<V>of(), map))));
map = new HashMap<>();
}
// Verify that the user isn't trying to insert the same key multiple times.
checkState(!map.containsKey(kv.getValue().getValue().getKey()),
"Multiple values [%s, %s] found for single key [%s] within window [%s].",
map.get(kv.getValue().getValue().getKey()),
kv.getValue().getValue().getValue(),
kv.getKey());
map.put(kv.getValue().getValue().getKey(),
kv.getValue().withValue(kv.getValue().getValue().getValue()));
previousWindowStructuralValue = Optional.of(currentWindowStructuralValue);
previousWindow = Optional.of(kv.getKey());
}
// The last value for this hash is guaranteed to be at a window boundary
// so we output a transformed map containing all the elements since the last
// window boundary.
c.output(IsmRecord.of(
ImmutableList.of(previousWindow.get()),
valueInEmptyWindows(new TransformedMap<>(WindowedValueToValue.<V>of(), map))));
}
}
private final DataflowPipelineRunner runner;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public BatchViewAsMap(DataflowPipelineRunner runner, View.AsMap<K, V> transform) {
this.runner = runner;
}
@Override
public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
return this.<BoundedWindow>applyInternal(input);
}
private <W extends BoundedWindow> PCollectionView<Map<K, V>>
applyInternal(PCollection<KV<K, V>> input) {
@SuppressWarnings({"rawtypes", "unchecked"})
KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
try {
PCollectionView<Map<K, V>> view = PCollectionViews.mapView(
input.getPipeline(), input.getWindowingStrategy(), inputCoder);
return BatchViewAsMultimap.applyForMapLike(runner, input, view, true /* unique keys */);
} catch (NonDeterministicException e) {
runner.recordViewUsesNonDeterministicKeyCoder(this);
// Since the key coder is not deterministic, we convert the map into a singleton
// and return a singleton view equivalent.
return applyForSingletonFallback(input);
}
}
@Override
protected String getKindString() {
return "BatchViewAsMap";
}
/** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */
private <W extends BoundedWindow> PCollectionView<Map<K, V>>
applyForSingletonFallback(PCollection<KV<K, V>> input) {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>)
input.getWindowingStrategy().getWindowFn().windowCoder();
@SuppressWarnings({"rawtypes", "unchecked"})
KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
@SuppressWarnings({"unchecked", "rawtypes"})
Coder<Function<WindowedValue<V>, V>> transformCoder =
(Coder) SerializableCoder.of(WindowedValueToValue.class);
Coder<TransformedMap<K, WindowedValue<V>, V>> finalValueCoder =
TransformedMapCoder.of(
transformCoder,
MapCoder.of(
inputCoder.getKeyCoder(),
FullWindowedValueCoder.of(inputCoder.getValueCoder(), windowCoder)));
TransformedMap<K, WindowedValue<V>, V> defaultValue = new TransformedMap<>(
WindowedValueToValue.<V>of(),
ImmutableMap.<K, WindowedValue<V>>of());
return BatchViewAsSingleton.<KV<K, V>,
TransformedMap<K, WindowedValue<V>, V>,
Map<K, V>,
W> applyForSingleton(
runner,
input,
new ToMapDoFn<K, V, W>(windowCoder),
true,
defaultValue,
finalValueCoder);
}
}
/**
* Specialized implementation for
* {@link com.google.cloud.dataflow.sdk.transforms.View.AsMultimap View.AsMultimap} for the
* Dataflow runner in batch mode.
*
* <p>Creates a set of {@code Ism} files sharded by the hash of the key's byte
* representation. Each record is structured as follows:
* <ul>
* <li>Key 1: User key K</li>
* <li>Key 2: Window</li>
* <li>Key 3: Index offset for a given key and window.</li>
* <li>Value: Windowed value</li>
* </ul>
*
* <p>Alongside the data records, there are the following metadata records:
* <ul>
* <li>Key 1: Metadata Key</li>
* <li>Key 2: Window</li>
* <li>Key 3: Index [0, size of map]</li>
* <li>Value: variable length long byte representation of size of map if index is 0,
* otherwise the byte representation of a key</li>
* </ul>
* The {@code [META, Window, 0]} record stores the number of unique keys per window, while
* {@code [META, Window, i]} for {@code i} in {@code [1, size of map]} stores a the users key.
* This allows for one to access the size of the map by looking at {@code [META, Window, 0]}
* and iterate over all the keys by accessing {@code [META, Window, i]} for {@code i} in
* {@code [1, size of map]}.
*
* <p>Note that in the case of a non-deterministic key coder, we fallback to using
* {@link com.google.cloud.dataflow.sdk.transforms.View.AsSingleton View.AsSingleton} printing
* a warning to users to specify a deterministic key coder.
*/
static class BatchViewAsMultimap<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
/**
* A {@link PTransform} that groups elements by the hash of window's byte representation
* if the input {@link PCollection} is not within the global window. Otherwise by the hash
* of the window and key's byte representation. This {@link PTransform} also sorts
* the values by the combination of the window and key's byte representations.
*/
private static class GroupByKeyHashAndSortByKeyAndWindow<K, V, W extends BoundedWindow>
extends PTransform<PCollection<KV<K, V>>,
PCollection<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>>> {
@SystemDoFnInternal
private static class GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V, W>
extends DoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>>
implements DoFn.RequiresWindowAccess {
private final IsmRecordCoder<?> coder;
private GroupByKeyHashAndSortByKeyAndWindowDoFn(IsmRecordCoder<?> coder) {
this.coder = coder;
}
@Override
public void processElement(ProcessContext c) throws Exception {
@SuppressWarnings("unchecked")
W window = (W) c.window();
c.output(
KV.of(coder.hash(ImmutableList.of(c.element().getKey())),
KV.of(KV.of(c.element().getKey(), window),
WindowedValue.of(
c.element().getValue(),
c.timestamp(),
(BoundedWindow) window,
c.pane()))));
}
}
private final IsmRecordCoder<?> coder;
public GroupByKeyHashAndSortByKeyAndWindow(IsmRecordCoder<?> coder) {
this.coder = coder;
}
@Override
public PCollection<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>>
apply(PCollection<KV<K, V>> input) {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>)
input.getWindowingStrategy().getWindowFn().windowCoder();
@SuppressWarnings("unchecked")
KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
PCollection<KV<Integer, KV<KV<K, W>, WindowedValue<V>>>> keyedByHash;
keyedByHash = input.apply(
ParDo.of(new GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V, W>(coder)));
keyedByHash.setCoder(
KvCoder.of(
VarIntCoder.of(),
KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), windowCoder),
FullWindowedValueCoder.of(inputCoder.getValueCoder(), windowCoder))));
return keyedByHash.apply(
new GroupByKeyAndSortValuesOnly<Integer, KV<K, W>, WindowedValue<V>>());
}
}
/**
* A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows
* and keys to locate window and key boundaries. The main output {@link IsmRecord}s have:
* <ul>
* <li>Key 1: Window</li>
* <li>Key 2: User key K</li>
* <li>Key 3: Index offset for a given key and window.</li>
* <li>Value: Windowed value</li>
* </ul>
*
* <p>Additionally, we output all the unique keys per window seen to {@code outputForEntrySet}
* and the unique key count per window to {@code outputForSize}.
*
* <p>Finally, if this DoFn has been requested to perform unique key checking, it will
* throw an {@link IllegalStateException} if more than one key per window is found.
*/
static class ToIsmRecordForMapLikeDoFn<K, V, W extends BoundedWindow>
extends DoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>,
IsmRecord<WindowedValue<V>>> {
private final TupleTag<KV<Integer, KV<W, Long>>> outputForSize;
private final TupleTag<KV<Integer, KV<W, K>>> outputForEntrySet;
private final Coder<W> windowCoder;
private final Coder<K> keyCoder;
private final IsmRecordCoder<WindowedValue<V>> ismCoder;
private final boolean uniqueKeysExpected;
ToIsmRecordForMapLikeDoFn(
TupleTag<KV<Integer, KV<W, Long>>> outputForSize,
TupleTag<KV<Integer, KV<W, K>>> outputForEntrySet,
Coder<W> windowCoder,
Coder<K> keyCoder,
IsmRecordCoder<WindowedValue<V>> ismCoder,
boolean uniqueKeysExpected) {
this.outputForSize = outputForSize;
this.outputForEntrySet = outputForEntrySet;
this.windowCoder = windowCoder;
this.keyCoder = keyCoder;
this.ismCoder = ismCoder;
this.uniqueKeysExpected = uniqueKeysExpected;
}
@Override
public void processElement(ProcessContext c) throws Exception {
long currentKeyIndex = 0;
// We use one based indexing while counting
long currentUniqueKeyCounter = 1;
Iterator<KV<KV<K, W>, WindowedValue<V>>> iterator = c.element().getValue().iterator();
KV<KV<K, W>, WindowedValue<V>> currentValue = iterator.next();
Object currentKeyStructuralValue =
keyCoder.structuralValue(currentValue.getKey().getKey());
Object currentWindowStructuralValue =
windowCoder.structuralValue(currentValue.getKey().getValue());
while (iterator.hasNext()) {
KV<KV<K, W>, WindowedValue<V>> nextValue = iterator.next();
Object nextKeyStructuralValue =
keyCoder.structuralValue(nextValue.getKey().getKey());
Object nextWindowStructuralValue =
windowCoder.structuralValue(nextValue.getKey().getValue());
outputDataRecord(c, currentValue, currentKeyIndex);
final long nextKeyIndex;
final long nextUniqueKeyCounter;
// Check to see if its a new window
if (!currentWindowStructuralValue.equals(nextWindowStructuralValue)) {
// The next value is a new window, so we output for size the number of unique keys
// seen and the last key of the window. We also reset the next key index the unique
// key counter.
outputMetadataRecordForSize(c, currentValue, currentUniqueKeyCounter);
outputMetadataRecordForEntrySet(c, currentValue);
nextKeyIndex = 0;
nextUniqueKeyCounter = 1;
} else if (!currentKeyStructuralValue.equals(nextKeyStructuralValue)){
// It is a new key within the same window so output the key for the entry set,
// reset the key index and increase the count of unique keys seen within this window.
outputMetadataRecordForEntrySet(c, currentValue);
nextKeyIndex = 0;
nextUniqueKeyCounter = currentUniqueKeyCounter + 1;
} else if (!uniqueKeysExpected) {
// It is not a new key so we don't have to output the number of elements in this
// window or increase the unique key counter. All we do is increase the key index.
nextKeyIndex = currentKeyIndex + 1;
nextUniqueKeyCounter = currentUniqueKeyCounter;
} else {
throw new IllegalStateException(String.format(
"Unique keys are expected but found key %s with values %s and %s in window %s.",
currentValue.getKey().getKey(),
currentValue.getValue().getValue(),
nextValue.getValue().getValue(),
currentValue.getKey().getValue()));
}
currentValue = nextValue;
currentWindowStructuralValue = nextWindowStructuralValue;
currentKeyStructuralValue = nextKeyStructuralValue;
currentKeyIndex = nextKeyIndex;
currentUniqueKeyCounter = nextUniqueKeyCounter;
}
outputDataRecord(c, currentValue, currentKeyIndex);
outputMetadataRecordForSize(c, currentValue, currentUniqueKeyCounter);
// The last value for this hash is guaranteed to be at a window boundary
// so we output a record with the number of unique keys seen.
outputMetadataRecordForEntrySet(c, currentValue);
}
/** This outputs the data record. */
private void outputDataRecord(
ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value, long keyIndex) {
IsmRecord<WindowedValue<V>> ismRecord = IsmRecord.of(
ImmutableList.of(
value.getKey().getKey(),
value.getKey().getValue(),
keyIndex),
value.getValue());
c.output(ismRecord);
}
/**
* This outputs records which will be used to compute the number of keys for a given window.
*/
private void outputMetadataRecordForSize(
ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value, long uniqueKeyCount) {
c.sideOutput(outputForSize,
KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(),
value.getKey().getValue())),
KV.of(value.getKey().getValue(), uniqueKeyCount)));
}
/** This outputs records which will be used to construct the entry set. */
private void outputMetadataRecordForEntrySet(
ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value) {
c.sideOutput(outputForEntrySet,
KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(),
value.getKey().getValue())),
KV.of(value.getKey().getValue(), value.getKey().getKey())));
}
}
/**
* A {@link DoFn} which outputs a metadata {@link IsmRecord} per window of:
* <ul>
* <li>Key 1: META key</li>
* <li>Key 2: window</li>
* <li>Key 3: 0L (constant)</li>
* <li>Value: sum of values for window</li>
* </ul>
*
* <p>This {@link DoFn} is meant to be used to compute the number of unique keys
* per window for map and multimap side inputs.
*/
static class ToIsmMetadataRecordForSizeDoFn<K, V, W extends BoundedWindow>
extends DoFn<KV<Integer, Iterable<KV<W, Long>>>, IsmRecord<WindowedValue<V>>> {
private final Coder<W> windowCoder;
ToIsmMetadataRecordForSizeDoFn(Coder<W> windowCoder) {
this.windowCoder = windowCoder;
}
@Override
public void processElement(ProcessContext c) throws Exception {
Iterator<KV<W, Long>> iterator = c.element().getValue().iterator();
KV<W, Long> currentValue = iterator.next();
Object currentWindowStructuralValue = windowCoder.structuralValue(currentValue.getKey());
long size = 0;
while (iterator.hasNext()) {
KV<W, Long> nextValue = iterator.next();
Object nextWindowStructuralValue = windowCoder.structuralValue(nextValue.getKey());
size += currentValue.getValue();
if (!currentWindowStructuralValue.equals(nextWindowStructuralValue)) {
c.output(IsmRecord.<WindowedValue<V>>meta(
ImmutableList.of(IsmFormat.getMetadataKey(), currentValue.getKey(), 0L),
CoderUtils.encodeToByteArray(VarLongCoder.of(), size)));
size = 0;
}
currentValue = nextValue;
currentWindowStructuralValue = nextWindowStructuralValue;
}
size += currentValue.getValue();
// Output the final value since it is guaranteed to be on a window boundary.
c.output(IsmRecord.<WindowedValue<V>>meta(
ImmutableList.of(IsmFormat.getMetadataKey(), currentValue.getKey(), 0L),
CoderUtils.encodeToByteArray(VarLongCoder.of(), size)));
}
}
/**
* A {@link DoFn} which outputs a metadata {@link IsmRecord} per window and key pair of:
* <ul>
* <li>Key 1: META key</li>
* <li>Key 2: window</li>
* <li>Key 3: index offset (1-based index)</li>
* <li>Value: key</li>
* </ul>
*
* <p>This {@link DoFn} is meant to be used to output index to key records
* per window for map and multimap side inputs.
*/
static class ToIsmMetadataRecordForKeyDoFn<K, V, W extends BoundedWindow>
extends DoFn<KV<Integer, Iterable<KV<W, K>>>, IsmRecord<WindowedValue<V>>> {
private final Coder<K> keyCoder;
private final Coder<W> windowCoder;
ToIsmMetadataRecordForKeyDoFn(Coder<K> keyCoder, Coder<W> windowCoder) {
this.keyCoder = keyCoder;
this.windowCoder = windowCoder;
}
@Override
public void processElement(ProcessContext c) throws Exception {
Iterator<KV<W, K>> iterator = c.element().getValue().iterator();
KV<W, K> currentValue = iterator.next();
Object currentWindowStructuralValue = windowCoder.structuralValue(currentValue.getKey());
long elementsInWindow = 1;
while (iterator.hasNext()) {
KV<W, K> nextValue = iterator.next();
Object nextWindowStructuralValue = windowCoder.structuralValue(nextValue.getKey());
c.output(IsmRecord.<WindowedValue<V>>meta(
ImmutableList.of(IsmFormat.getMetadataKey(), currentValue.getKey(), elementsInWindow),
CoderUtils.encodeToByteArray(keyCoder, currentValue.getValue())));
elementsInWindow += 1;
if (!currentWindowStructuralValue.equals(nextWindowStructuralValue)) {
elementsInWindow = 1;
}
currentValue = nextValue;
currentWindowStructuralValue = nextWindowStructuralValue;
}
// Output the final value since it is guaranteed to be on a window boundary.
c.output(IsmRecord.<WindowedValue<V>>meta(
ImmutableList.of(IsmFormat.getMetadataKey(), currentValue.getKey(), elementsInWindow),
CoderUtils.encodeToByteArray(keyCoder, currentValue.getValue())));
}
}
/**
* A {@link DoFn} which partitions sets of elements by window boundaries. Within each
* partition, the set of elements is transformed into a {@link TransformedMap}.
* The transformed {@code Map<K, Iterable<V>>} is backed by a
* {@code Map<K, Iterable<WindowedValue<V>>>} and contains a function
* {@code Iterable<WindowedValue<V>> -> Iterable<V>}.
*
* <p>Outputs {@link IsmRecord}s having:
* <ul>
* <li>Key 1: Window</li>
* <li>Value: Transformed map containing a transform that removes the encapsulation
* of the window around each value,
* {@code Map<K, Iterable<WindowedValue<V>>> -> Map<K, Iterable<V>>}.</li>
* </ul>
*/
static class ToMultimapDoFn<K, V, W extends BoundedWindow>
extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
IsmRecord<WindowedValue<TransformedMap<K,
Iterable<WindowedValue<V>>,
Iterable<V>>>>> {
private final Coder<W> windowCoder;
ToMultimapDoFn(Coder<W> windowCoder) {
this.windowCoder = windowCoder;
}
@Override
public void processElement(ProcessContext c)
throws Exception {
Optional<Object> previousWindowStructuralValue = Optional.absent();
Optional<W> previousWindow = Optional.absent();
Multimap<K, WindowedValue<V>> multimap = HashMultimap.create();
for (KV<W, WindowedValue<KV<K, V>>> kv : c.element().getValue()) {
Object currentWindowStructuralValue = windowCoder.structuralValue(kv.getKey());
if (previousWindowStructuralValue.isPresent()
&& !previousWindowStructuralValue.get().equals(currentWindowStructuralValue)) {
// Construct the transformed map containing all the elements since we
// are at a window boundary.
@SuppressWarnings({"unchecked", "rawtypes"})
Map<K, Iterable<WindowedValue<V>>> resultMap = (Map) multimap.asMap();
c.output(IsmRecord.<WindowedValue<TransformedMap<K,
Iterable<WindowedValue<V>>,
Iterable<V>>>>of(
ImmutableList.of(previousWindow.get()),
valueInEmptyWindows(
new TransformedMap<>(
IterableWithWindowedValuesToIterable.<V>of(), resultMap))));
multimap = HashMultimap.create();
}
multimap.put(kv.getValue().getValue().getKey(),
kv.getValue().withValue(kv.getValue().getValue().getValue()));
previousWindowStructuralValue = Optional.of(currentWindowStructuralValue);
previousWindow = Optional.of(kv.getKey());
}
// The last value for this hash is guaranteed to be at a window boundary
// so we output a transformed map containing all the elements since the last
// window boundary.
@SuppressWarnings({"unchecked", "rawtypes"})
Map<K, Iterable<WindowedValue<V>>> resultMap = (Map) multimap.asMap();
c.output(IsmRecord.<WindowedValue<TransformedMap<K,
Iterable<WindowedValue<V>>,
Iterable<V>>>>of(
ImmutableList.of(previousWindow.get()),
valueInEmptyWindows(
new TransformedMap<>(IterableWithWindowedValuesToIterable.<V>of(), resultMap))));
}
}
private final DataflowPipelineRunner runner;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public BatchViewAsMultimap(DataflowPipelineRunner runner, View.AsMultimap<K, V> transform) {
this.runner = runner;
}
@Override
public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
return this.<BoundedWindow>applyInternal(input);
}
private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>>
applyInternal(PCollection<KV<K, V>> input) {
@SuppressWarnings({"rawtypes", "unchecked"})
KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
try {
PCollectionView<Map<K, Iterable<V>>> view = PCollectionViews.multimapView(
input.getPipeline(), input.getWindowingStrategy(), inputCoder);
return applyForMapLike(runner, input, view, false /* unique keys not expected */);
} catch (NonDeterministicException e) {
runner.recordViewUsesNonDeterministicKeyCoder(this);
// Since the key coder is not deterministic, we convert the map into a singleton
// and return a singleton view equivalent.
return applyForSingletonFallback(input);
}
}
/** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */
private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>>
applyForSingletonFallback(PCollection<KV<K, V>> input) {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>)
input.getWindowingStrategy().getWindowFn().windowCoder();
@SuppressWarnings({"rawtypes", "unchecked"})
KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
@SuppressWarnings({"unchecked", "rawtypes"})
Coder<Function<Iterable<WindowedValue<V>>, Iterable<V>>> transformCoder =
(Coder) SerializableCoder.of(IterableWithWindowedValuesToIterable.class);
Coder<TransformedMap<K, Iterable<WindowedValue<V>>, Iterable<V>>> finalValueCoder =
TransformedMapCoder.of(
transformCoder,
MapCoder.of(
inputCoder.getKeyCoder(),
IterableCoder.of(
FullWindowedValueCoder.of(inputCoder.getValueCoder(), windowCoder))));
TransformedMap<K, Iterable<WindowedValue<V>>, Iterable<V>> defaultValue =
new TransformedMap<>(
IterableWithWindowedValuesToIterable.<V>of(),
ImmutableMap.<K, Iterable<WindowedValue<V>>>of());
return BatchViewAsSingleton.<KV<K, V>,
TransformedMap<K, Iterable<WindowedValue<V>>, Iterable<V>>,
Map<K, Iterable<V>>,
W> applyForSingleton(
runner,
input,
new ToMultimapDoFn<K, V, W>(windowCoder),
true,
defaultValue,
finalValueCoder);
}
private static <K, V, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForMapLike(
DataflowPipelineRunner runner,
PCollection<KV<K, V>> input,
PCollectionView<ViewT> view,
boolean uniqueKeysExpected) throws NonDeterministicException {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>)
input.getWindowingStrategy().getWindowFn().windowCoder();
@SuppressWarnings({"rawtypes", "unchecked"})
KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
// If our key coder is deterministic, we can use the key portion of each KV
// part of a composite key containing the window , key and index.
inputCoder.getKeyCoder().verifyDeterministic();
IsmRecordCoder<WindowedValue<V>> ismCoder =
coderForMapLike(windowCoder, inputCoder.getKeyCoder(), inputCoder.getValueCoder());
// Create the various output tags representing the main output containing the data stream
// and the side outputs containing the metadata about the size and entry set.
TupleTag<IsmRecord<WindowedValue<V>>> mainOutputTag = new TupleTag<>();
TupleTag<KV<Integer, KV<W, Long>>> outputForSizeTag = new TupleTag<>();
TupleTag<KV<Integer, KV<W, K>>> outputForEntrySetTag = new TupleTag<>();
// Process all the elements grouped by key hash, and sorted by key and then window
// outputting to all the outputs defined above.
PCollectionTuple outputTuple = input
.apply("GBKaSVForData", new GroupByKeyHashAndSortByKeyAndWindow<K, V, W>(ismCoder))
.apply(ParDo.of(new ToIsmRecordForMapLikeDoFn<K, V, W>(
outputForSizeTag, outputForEntrySetTag,
windowCoder, inputCoder.getKeyCoder(), ismCoder, uniqueKeysExpected))
.withOutputTags(mainOutputTag,
TupleTagList.of(
ImmutableList.<TupleTag<?>>of(outputForSizeTag,
outputForEntrySetTag))));
// Set the coder on the main data output.
PCollection<IsmRecord<WindowedValue<V>>> perHashWithReifiedWindows =
outputTuple.get(mainOutputTag);
perHashWithReifiedWindows.setCoder(ismCoder);
// Set the coder on the metadata output for size and process the entries
// producing a [META, Window, 0L] record per window storing the number of unique keys
// for each window.
PCollection<KV<Integer, KV<W, Long>>> outputForSize = outputTuple.get(outputForSizeTag);
outputForSize.setCoder(
KvCoder.of(VarIntCoder.of(),
KvCoder.of(windowCoder, VarLongCoder.of())));
PCollection<IsmRecord<WindowedValue<V>>> windowMapSizeMetadata = outputForSize
.apply("GBKaSVForSize", new GroupByKeyAndSortValuesOnly<Integer, W, Long>())
.apply(ParDo.of(new ToIsmMetadataRecordForSizeDoFn<K, V, W>(windowCoder)));
windowMapSizeMetadata.setCoder(ismCoder);
// Set the coder on the metadata output destined to build the entry set and process the
// entries producing a [META, Window, Index] record per window key pair storing the key.
PCollection<KV<Integer, KV<W, K>>> outputForEntrySet =
outputTuple.get(outputForEntrySetTag);
outputForEntrySet.setCoder(
KvCoder.of(VarIntCoder.of(),
KvCoder.of(windowCoder, inputCoder.getKeyCoder())));
PCollection<IsmRecord<WindowedValue<V>>> windowMapKeysMetadata = outputForEntrySet
.apply("GBKaSVForKeys", new GroupByKeyAndSortValuesOnly<Integer, W, K>())
.apply(ParDo.of(
new ToIsmMetadataRecordForKeyDoFn<K, V, W>(inputCoder.getKeyCoder(), windowCoder)));
windowMapKeysMetadata.setCoder(ismCoder);
// Set that all these outputs should be materialized using an indexed format.
runner.addPCollectionRequiringIndexedFormat(perHashWithReifiedWindows);
runner.addPCollectionRequiringIndexedFormat(windowMapSizeMetadata);
runner.addPCollectionRequiringIndexedFormat(windowMapKeysMetadata);
PCollectionList<IsmRecord<WindowedValue<V>>> outputs =
PCollectionList.of(ImmutableList.of(
perHashWithReifiedWindows, windowMapSizeMetadata, windowMapKeysMetadata));
return Pipeline.applyTransform(outputs,
Flatten.<IsmRecord<WindowedValue<V>>>pCollections())
.apply(CreatePCollectionView.<IsmRecord<WindowedValue<V>>,
ViewT>of(view));
}
@Override
protected String getKindString() {
return "BatchViewAsMultimap";
}
static <V> IsmRecordCoder<WindowedValue<V>> coderForMapLike(
Coder<? extends BoundedWindow> windowCoder, Coder<?> keyCoder, Coder<V> valueCoder) {
// TODO: swap to use a variable length long coder which has values which compare
// the same as their byte representation compare lexicographically within the key coder
return IsmRecordCoder.of(
1, // We use only the key for hashing when producing value records
2, // Since the key is not present, we add the window to the hash when
// producing metadata records
ImmutableList.of(
MetadataKeyCoder.of(keyCoder),
windowCoder,
BigEndianLongCoder.of()),
FullWindowedValueCoder.of(valueCoder, windowCoder));
}
}
/**
* A {@code Map<K, V2>} backed by a {@code Map<K, V1>} and a function that transforms
* {@code V1 -> V2}.
*/
static class TransformedMap<K, V1, V2>
extends ForwardingMap<K, V2> {
private final Function<V1, V2> transform;
private final Map<K, V1> originalMap;
private final Map<K, V2> transformedMap;
private TransformedMap(Function<V1, V2> transform, Map<K, V1> originalMap) {
this.transform = transform;
this.originalMap = Collections.unmodifiableMap(originalMap);
this.transformedMap = Maps.transformValues(originalMap, transform);
}
@Override
protected Map<K, V2> delegate() {
return transformedMap;
}
}
/**
* A {@link Coder} for {@link TransformedMap}s.
*/
static class TransformedMapCoder<K, V1, V2>
extends StandardCoder<TransformedMap<K, V1, V2>> {
private final Coder<Function<V1, V2>> transformCoder;
private final Coder<Map<K, V1>> originalMapCoder;
private TransformedMapCoder(
Coder<Function<V1, V2>> transformCoder, Coder<Map<K, V1>> originalMapCoder) {
this.transformCoder = transformCoder;
this.originalMapCoder = originalMapCoder;
}
public static <K, V1, V2> TransformedMapCoder<K, V1, V2> of(
Coder<Function<V1, V2>> transformCoder, Coder<Map<K, V1>> originalMapCoder) {
return new TransformedMapCoder<>(transformCoder, originalMapCoder);
}
@JsonCreator
public static <K, V1, V2> TransformedMapCoder<K, V1, V2> of(
@JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
List<Coder<?>> components) {
checkArgument(components.size() == 2,
"Expecting 2 components, got " + components.size());
@SuppressWarnings("unchecked")
Coder<Function<V1, V2>> transformCoder = (Coder<Function<V1, V2>>) components.get(0);
@SuppressWarnings("unchecked")
Coder<Map<K, V1>> originalMapCoder = (Coder<Map<K, V1>>) components.get(1);
return of(transformCoder, originalMapCoder);
}
@Override
public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream,
Coder.Context context) throws CoderException, IOException {
transformCoder.encode(value.transform, outStream, context.nested());
originalMapCoder.encode(value.originalMap, outStream, context.nested());
}
@Override
public TransformedMap<K, V1, V2> decode(
InputStream inStream, Coder.Context context) throws CoderException, IOException {
return new TransformedMap<>(
transformCoder.decode(inStream, context.nested()),
originalMapCoder.decode(inStream, context.nested()));
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return Arrays.asList(transformCoder, originalMapCoder);
}
@Override
public void verifyDeterministic()
throws com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException {
verifyDeterministic("Expected transform coder to be deterministic.", transformCoder);
verifyDeterministic("Expected map coder to be deterministic.", originalMapCoder);
}
}
/**
* A {@link Function} which converts {@code WindowedValue<V>} to {@code V}.
*/
private static class WindowedValueToValue<V> implements
Function<WindowedValue<V>, V>, Serializable {
private static final WindowedValueToValue<?> INSTANCE = new WindowedValueToValue<>();
@SuppressWarnings({"unchecked", "rawtypes"})
private static <V> WindowedValueToValue<V> of() {
return (WindowedValueToValue) INSTANCE;
}
@Override
public V apply(WindowedValue<V> input) {
return input.getValue();
}
}
/**
* A {@link Function} which converts {@code Iterable<WindowedValue<V>>} to {@code Iterable<V>}.
*/
private static class IterableWithWindowedValuesToIterable<V> implements
Function<Iterable<WindowedValue<V>>, Iterable<V>>, Serializable {
private static final IterableWithWindowedValuesToIterable<?> INSTANCE =
new IterableWithWindowedValuesToIterable<>();
@SuppressWarnings({"unchecked", "rawtypes"})
private static <V> IterableWithWindowedValuesToIterable<V> of() {
return (IterableWithWindowedValuesToIterable) INSTANCE;
}
@Override
public Iterable<V> apply(Iterable<WindowedValue<V>> input) {
return Iterables.transform(input, WindowedValueToValue.<V>of());
}
}
/**
* Specialized implementation which overrides
* {@link com.google.cloud.dataflow.sdk.io.Write.Bound Write.Bound} to provide Google
* Cloud Dataflow specific path validation of {@link FileBasedSink}s.
*/
private static class BatchWrite<T> extends PTransform<PCollection<T>, PDone> {
private final DataflowPipelineRunner runner;
private final Write.Bound<T> transform;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public BatchWrite(DataflowPipelineRunner runner, Write.Bound<T> transform) {
this.runner = runner;
this.transform = transform;
}
@Override
public PDone apply(PCollection<T> input) {
if (transform.getSink() instanceof FileBasedSink) {
FileBasedSink<?> sink = (FileBasedSink<?>) transform.getSink();
PathValidator validator = runner.options.getPathValidator();
validator.validateOutputFilePrefixSupported(sink.getBaseOutputFilename());
}
return transform.apply(input);
}
}
/**
* This {@link PTransform} is used by the {@link DataflowPipelineTranslator} as a way
* to provide the native definition of the BigQuery sink.
*/
private static class BatchBigQueryIONativeRead extends PTransform<PInput, PCollection<TableRow>> {
private final BigQueryIO.Read.Bound transform;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public BatchBigQueryIONativeRead(
DataflowPipelineRunner runner, BigQueryIO.Read.Bound transform) {
this.transform = transform;
}
@Override
public PCollection<TableRow> apply(PInput input) {
return PCollection.<TableRow>createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
IsBounded.BOUNDED)
// Force the output's Coder to be what the read is using, and
// unchangeable later, to ensure that we read the input in the
// format specified by the Read transform.
.setCoder(TableRowJsonCoder.of());
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
transform.populateDisplayData(builder);
}
static {
DataflowPipelineTranslator.registerTransformTranslator(
BatchBigQueryIONativeRead.class, new BatchBigQueryIONativeReadTranslator());
}
}
/**
* Implements BigQueryIO Read translation for the Dataflow backend.
*/
public static class BatchBigQueryIONativeReadTranslator
implements DataflowPipelineTranslator.TransformTranslator<BatchBigQueryIONativeRead> {
@Override
public void translate(
BatchBigQueryIONativeRead transform,
DataflowPipelineTranslator.TranslationContext context) {
translateWriteHelper(transform, transform.transform, context);
}
private void translateWriteHelper(
BatchBigQueryIONativeRead transform,
BigQueryIO.Read.Bound originalTransform,
TranslationContext context) {
// Actual translation.
context.addStep(transform, "ParallelRead");
context.addInput(PropertyNames.FORMAT, "bigquery");
context.addInput(PropertyNames.BIGQUERY_EXPORT_FORMAT, "FORMAT_AVRO");
if (originalTransform.getQuery() != null) {
context.addInput(PropertyNames.BIGQUERY_QUERY, originalTransform.getQuery());
context.addInput(
PropertyNames.BIGQUERY_FLATTEN_RESULTS, originalTransform.getFlattenResults());
context.addInput(
PropertyNames.BIGQUERY_USE_LEGACY_SQL, originalTransform.getUseLegacySql());
} else {
TableReference table = originalTransform.getTable();
if (table.getProjectId() == null) {
// If user does not specify a project we assume the table to be located in the project
// that owns the Dataflow job.
String projectIdFromOptions = context.getPipelineOptions().getProject();
LOG.warn(
"No project specified for BigQuery table \"{}.{}\". Assuming it is in \"{}\". If the"
+ " table is in a different project please specify it as a part of the BigQuery table"
+ " definition.",
table.getDatasetId(),
table.getTableId(),
projectIdFromOptions);
table.setProjectId(projectIdFromOptions);
}
context.addInput(PropertyNames.BIGQUERY_TABLE, table.getTableId());
context.addInput(PropertyNames.BIGQUERY_DATASET, table.getDatasetId());
if (table.getProjectId() != null) {
context.addInput(PropertyNames.BIGQUERY_PROJECT, table.getProjectId());
}
}
context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
}
}
private static class BatchBigQueryIOWrite extends PTransform<PCollection<TableRow>, PDone> {
private final BigQueryIO.Write.Bound transform;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public BatchBigQueryIOWrite(DataflowPipelineRunner runner, BigQueryIO.Write.Bound transform) {
this.transform = transform;
}
@Override
public PDone apply(PCollection<TableRow> input) {
if (transform.getTable() == null) {
// BigQueryIO.Write is using tableRefFunction with StreamWithDeDup.
return transform.apply(input);
} else {
return input
.apply(new BatchBigQueryIONativeWrite(transform));
}
}
}
/**
* This {@link PTransform} is used by the {@link DataflowPipelineTranslator} as a way
* to provide the native definition of the BigQuery sink.
*/
private static class BatchBigQueryIONativeWrite extends PTransform<PCollection<TableRow>, PDone> {
private final BigQueryIO.Write.Bound transform;
public BatchBigQueryIONativeWrite(BigQueryIO.Write.Bound transform) {
this.transform = transform;
}
@Override
public PDone apply(PCollection<TableRow> input) {
return PDone.in(input.getPipeline());
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
transform.populateDisplayData(builder);
}
static {
DataflowPipelineTranslator.registerTransformTranslator(
BatchBigQueryIONativeWrite.class, new BatchBigQueryIONativeWriteTranslator());
}
}
/**
* {@code BigQueryIO.Write.Bound} support code for the Dataflow backend.
*/
private static class BatchBigQueryIONativeWriteTranslator
implements TransformTranslator<BatchBigQueryIONativeWrite> {
@SuppressWarnings("unchecked")
@Override
public void translate(BatchBigQueryIONativeWrite transform,
TranslationContext context) {
translateWriteHelper(transform, transform.transform, context);
}
private void translateWriteHelper(
BatchBigQueryIONativeWrite transform,
BigQueryIO.Write.Bound originalTransform,
TranslationContext context) {
if (context.getPipelineOptions().isStreaming()) {
// Streaming is handled by the streaming runner.
throw new AssertionError(
"BigQueryIO is specified to use streaming write in batch mode.");
}
TableReference table = originalTransform.getTable();
// Actual translation.
context.addStep(transform, "ParallelWrite");
context.addInput(PropertyNames.FORMAT, "bigquery");
context.addInput(PropertyNames.BIGQUERY_TABLE,
table.getTableId());
context.addInput(PropertyNames.BIGQUERY_DATASET,
table.getDatasetId());
if (table.getProjectId() != null) {
context.addInput(PropertyNames.BIGQUERY_PROJECT, table.getProjectId());
}
if (originalTransform.getSchema() != null) {
try {
context.addInput(PropertyNames.BIGQUERY_SCHEMA,
JSON_FACTORY.toString(originalTransform.getSchema()));
} catch (IOException exn) {
throw new IllegalArgumentException("Invalid table schema.", exn);
}
}
context.addInput(
PropertyNames.BIGQUERY_CREATE_DISPOSITION,
originalTransform.getCreateDisposition().name());
context.addInput(
PropertyNames.BIGQUERY_WRITE_DISPOSITION,
originalTransform.getWriteDisposition().name());
// Set sink encoding to TableRowJsonCoder.
context.addEncodingInput(
WindowedValue.getValueOnlyCoder(TableRowJsonCoder.of()));
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
}
}
/**
* Specialized (non-)implementation for
* {@link com.google.cloud.dataflow.sdk.io.Write.Bound Write.Bound}
* for the Dataflow runner in streaming mode.
*/
private static class StreamingWrite<T> extends PTransform<PCollection<T>, PDone> {
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public StreamingWrite(DataflowPipelineRunner runner, Write.Bound<T> transform) { }
@Override
public PDone apply(PCollection<T> input) {
throw new UnsupportedOperationException(
"The Write transform is not supported by the Dataflow streaming runner.");
}
@Override
protected String getKindString() {
return "StreamingWrite";
}
}
// ================================================================================
// PubsubIO translations
// ================================================================================
static {
DataflowPipelineTranslator.registerTransformTranslator(
PubsubIO.Read.Bound.class, new StreamingPubsubIOReadTranslator());
}
/**
* Rewrite {@link PubsubIO.Read.Bound} to the appropriate internal node.
*/
private static class StreamingPubsubIOReadTranslator implements
TransformTranslator<PubsubIO.Read.Bound> {
@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public void translate(
PubsubIO.Read.Bound transform,
TranslationContext context) {
translateTyped(transform, context);
}
@SuppressWarnings("deprecation") // uses internal deprecated code deliberately.
private <T> void translateTyped(
PubsubIO.Read.Bound<T> transform,
TranslationContext context) {
checkState(context.getPipelineOptions().isStreaming(),
"StreamingPubsubIORead is only for streaming pipelines.");
context.addStep(transform, "ParallelRead");
context.addInput(PropertyNames.FORMAT, "pubsub");
if (transform.getTopic() != null) {
context.addInput(PropertyNames.PUBSUB_TOPIC,
transform.getTopic().asV1Beta1Path());
}
if (transform.getSubscription() != null) {
context.addInput(
PropertyNames.PUBSUB_SUBSCRIPTION,
transform.getSubscription().asV1Beta1Path());
}
if (transform.getTimestampLabel() != null) {
context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
transform.getTimestampLabel());
}
if (transform.getIdLabel() != null) {
context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
}
context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
}
}
/**
* Suppress application of {@link PubsubUnboundedSink#apply} in streaming mode so that we
* can instead defer to Windmill's implementation.
*/
private static class StreamingPubsubIOWrite<T> extends PTransform<PCollection<T>, PDone> {
private final PubsubIO.Write.Bound<T> transform;
/**
* Builds an instance of this class from the overridden transform.
*/
public StreamingPubsubIOWrite(
DataflowPipelineRunner runner, PubsubIO.Write.Bound<T> transform) {
this.transform = transform;
}
PubsubIO.Write.Bound<T> getOverriddenTransform() {
return transform;
}
@Override
public PDone apply(PCollection<T> input) {
return PDone.in(input.getPipeline());
}
@Override
protected String getKindString() {
return "StreamingPubsubIOWrite";
}
static {
DataflowPipelineTranslator.registerTransformTranslator(
StreamingPubsubIOWrite.class, new StreamingPubsubIOWriteTranslator());
}
}
/**
* Rewrite {@link StreamingPubsubIOWrite} to the appropriate internal node.
*/
private static class StreamingPubsubIOWriteTranslator implements
TransformTranslator<StreamingPubsubIOWrite> {
@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public void translate(
StreamingPubsubIOWrite transform,
TranslationContext context) {
translateTyped(transform, context);
}
@SuppressWarnings("deprecation") // uses internal deprecated code deliberately.
private <T> void translateTyped(
StreamingPubsubIOWrite<T> transform,
TranslationContext context) {
checkState(context.getPipelineOptions().isStreaming(),
"StreamingPubsubIOWrite is only for streaming pipelines.");
PubsubIO.Write.Bound<T> overriddenTransform = transform.getOverriddenTransform();
context.addStep(transform, "ParallelWrite");
context.addInput(PropertyNames.FORMAT, "pubsub");
context.addInput(PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().asV1Beta1Path());
if (overriddenTransform.getTimestampLabel() != null) {
context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
overriddenTransform.getTimestampLabel());
}
if (overriddenTransform.getIdLabel() != null) {
context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
}
context.addEncodingInput(
WindowedValue.getValueOnlyCoder(overriddenTransform.getCoder()));
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
}
}
// ================================================================================
/**
* Specialized implementation for
* {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded Read.Unbounded} for the
* Dataflow runner in streaming mode.
*
* <p>In particular, if an UnboundedSource requires deduplication, then features of WindmillSink
* are leveraged to do the deduplication.
*/
private static class StreamingUnboundedRead<T> extends PTransform<PInput, PCollection<T>> {
private final UnboundedSource<T, ?> source;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public StreamingUnboundedRead(DataflowPipelineRunner runner, Read.Unbounded<T> transform) {
this.source = transform.getSource();
}
@Override
protected Coder<T> getDefaultOutputCoder() {
return source.getDefaultOutputCoder();
}
@Override
public final PCollection<T> apply(PInput input) {
source.validate();
if (source.requiresDeduping()) {
return Pipeline.applyTransform(input, new ReadWithIds<T>(source))
.apply(new Deduplicate<T>());
} else {
return Pipeline.applyTransform(input, new ReadWithIds<T>(source))
.apply(ValueWithRecordId.<T>stripIds());
}
}
/**
* {@link PTransform} that reads {@code (record,recordId)} pairs from an
* {@link UnboundedSource}.
*/
private static class ReadWithIds<T>
extends PTransform<PInput, PCollection<ValueWithRecordId<T>>> {
private final UnboundedSource<T, ?> source;
private ReadWithIds(UnboundedSource<T, ?> source) {
this.source = source;
}
@Override
public final PCollection<ValueWithRecordId<T>> apply(PInput input) {
return PCollection.<ValueWithRecordId<T>>createPrimitiveOutputInternal(
input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED);
}
@Override
protected Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getDefaultOutputCoder());
}
public UnboundedSource<T, ?> getSource() {
return source;
}
}
@Override
public String getKindString() {
return "Read(" + approximateSimpleName(source.getClass()) + ")";
}
static {
DataflowPipelineTranslator.registerTransformTranslator(
ReadWithIds.class, new ReadWithIdsTranslator());
}
private static class ReadWithIdsTranslator
implements DataflowPipelineTranslator.TransformTranslator<ReadWithIds<?>> {
@Override
public void translate(ReadWithIds<?> transform,
DataflowPipelineTranslator.TranslationContext context) {
ReadTranslator.translateReadHelper(transform.getSource(), transform, context);
}
}
}
/**
* Remove values with duplicate ids.
*/
private static class Deduplicate<T>
extends PTransform<PCollection<ValueWithRecordId<T>>, PCollection<T>> {
// Use a finite set of keys to improve bundling. Without this, the key space
// will be the space of ids which is potentially very large, which results in much
// more per-key overhead.
private static final int NUM_RESHARD_KEYS = 10000;
@Override
public PCollection<T> apply(PCollection<ValueWithRecordId<T>> input) {
return input
.apply(WithKeys.of(new SerializableFunction<ValueWithRecordId<T>, Integer>() {
@Override
public Integer apply(ValueWithRecordId<T> value) {
return Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS;
}
}))
// Reshuffle will dedup based on ids in ValueWithRecordId by passing the data through
// WindmillSink.
.apply(Reshuffle.<Integer, ValueWithRecordId<T>>of())
.apply(ParDo.named("StripIds").of(
new DoFn<KV<Integer, ValueWithRecordId<T>>, T>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().getValue().getValue());
}
}));
}
}
/**
* Specialized implementation for
* {@link com.google.cloud.dataflow.sdk.io.Read.Bounded Read.Bounded} for the
* Dataflow runner in streaming mode.
*/
private static class StreamingBoundedRead<T> extends PTransform<PInput, PCollection<T>> {
private final BoundedSource<T> source;
/** Builds an instance of this class from the overridden transform. */
@SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
public StreamingBoundedRead(DataflowPipelineRunner runner, Read.Bounded<T> transform) {
this.source = transform.getSource();
}
@Override
protected Coder<T> getDefaultOutputCoder() {
return source.getDefaultOutputCoder();
}
@Override
public final PCollection<T> apply(PInput input) {
source.validate();
return Pipeline.applyTransform(input, new DataflowUnboundedReadFromBoundedSource<>(source))
.setIsBoundedInternal(IsBounded.BOUNDED);
}
}
/**
* Specialized implementation for
* {@link com.google.cloud.dataflow.sdk.transforms.Create.Values Create.Values} for the
* Dataflow runner in streaming mode.
*/
private static class StreamingCreate<T> extends PTransform<PInput, PCollection<T>> {
private final Create.Values<T> transform;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public StreamingCreate(DataflowPipelineRunner runner, Create.Values<T> transform) {
this.transform = transform;
}
/**
* {@link DoFn} that outputs a single KV.of(null, null) kick off the {@link GroupByKey}
* in the streaming create implementation.
*/
private static class OutputNullKv extends DoFn<String, KV<Void, Void>> {
@Override
public void processElement(DoFn<String, KV<Void, Void>>.ProcessContext c) throws Exception {
c.output(KV.of((Void) null, (Void) null));
}
}
/**
* A {@link DoFn} which outputs the specified elements by first encoding them to bytes using
* the specified {@link Coder} so that they are serialized as part of the {@link DoFn} but
* need not implement {@code Serializable}.
*/
private static class OutputElements<T> extends DoFn<Object, T> {
private final Coder<T> coder;
private final List<byte[]> encodedElements;
public OutputElements(Iterable<T> elems, Coder<T> coder) {
this.coder = coder;
this.encodedElements = new ArrayList<>();
for (T t : elems) {
try {
encodedElements.add(CoderUtils.encodeToByteArray(coder, t));
} catch (CoderException e) {
throw new IllegalArgumentException("Unable to encode value " + t
+ " with coder " + coder, e);
}
}
}
@Override
public void processElement(ProcessContext c) throws IOException {
for (byte[] encodedElement : encodedElements) {
c.output(CoderUtils.decodeFromByteArray(coder, encodedElement));
}
}
}
@Override
public PCollection<T> apply(PInput input) {
try {
Coder<T> coder = transform.getDefaultOutputCoder(input);
return Pipeline.applyTransform(
input, PubsubIO.Read.named("StartingSignal").subscription("_starting_signal/"))
.apply(ParDo.of(new OutputNullKv()))
.apply("GlobalSingleton", Window.<KV<Void, Void>>into(new GlobalWindows())
.triggering(AfterPane.elementCountAtLeast(1))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply(GroupByKey.<Void, Void>create())
// Go back to the default windowing strategy, so that our setting allowed lateness
// doesn't count as the user having set it.
.setWindowingStrategyInternal(WindowingStrategy.globalDefault())
.apply(Window.<KV<Void, Iterable<Void>>>into(new GlobalWindows()))
.apply(ParDo.of(new OutputElements<>(transform.getElements(), coder)))
.setCoder(coder).setIsBoundedInternal(IsBounded.BOUNDED);
} catch (CannotProvideCoderException e) {
throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. "
+ "Please set a coder by invoking Create.withCoder() explicitly.", e);
}
}
@Override
protected String getKindString() {
return "StreamingCreate";
}
}
/**
* A specialized {@link DoFn} for writing the contents of a {@link PCollection}
* to a streaming {@link PCollectionView} backend implementation.
*/
private static class StreamingPCollectionViewWriterFn<T>
extends DoFn<Iterable<T>, T> implements DoFn.RequiresWindowAccess {
private final PCollectionView<?> view;
private final Coder<T> dataCoder;
public static <T> StreamingPCollectionViewWriterFn<T> create(
PCollectionView<?> view, Coder<T> dataCoder) {
return new StreamingPCollectionViewWriterFn<T>(view, dataCoder);
}
private StreamingPCollectionViewWriterFn(PCollectionView<?> view, Coder<T> dataCoder) {
this.view = view;
this.dataCoder = dataCoder;
}
@Override
public void processElement(ProcessContext c) throws Exception {
List<WindowedValue<T>> output = new ArrayList<>();
for (T elem : c.element()) {
output.add(WindowedValue.of(elem, c.timestamp(), c.window(), c.pane()));
}
c.windowingInternals().writePCollectionViewData(
view.getTagInternal(), output, dataCoder);
}
}
/**
* Specialized implementation for
* {@link com.google.cloud.dataflow.sdk.transforms.View.AsMap View.AsMap}
* for the Dataflow runner in streaming mode.
*/
private static class StreamingViewAsMap<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
private final DataflowPipelineRunner runner;
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public StreamingViewAsMap(DataflowPipelineRunner runner, View.AsMap<K, V> transform) {
this.runner = runner;
}
@Override
public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
PCollectionView<Map<K, V>> view =
PCollectionViews.mapView(
input.getPipeline(),
input.getWindowingStrategy(),
input.getCoder());
@SuppressWarnings({"rawtypes", "unchecked"})
KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
try {
inputCoder.getKeyCoder().verifyDeterministic();
} catch (NonDeterministicException e) {
runner.recordViewUsesNonDeterministicKeyCoder(this);
}
return input
.apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
.apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
.apply(View.CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view));
}
@Override
protected String getKindString() {
return "StreamingViewAsMap";
}
}
/**
* Specialized expansion for {@link
* com.google.cloud.dataflow.sdk.transforms.View.AsMultimap View.AsMultimap} for the
* Dataflow runner in streaming mode.
*/
private static class StreamingViewAsMultimap<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
private final DataflowPipelineRunner runner;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public StreamingViewAsMultimap(DataflowPipelineRunner runner, View.AsMultimap<K, V> transform) {
this.runner = runner;
}
@Override
public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
PCollectionView<Map<K, Iterable<V>>> view =
PCollectionViews.multimapView(
input.getPipeline(),
input.getWindowingStrategy(),
input.getCoder());
@SuppressWarnings({"rawtypes", "unchecked"})
KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
try {
inputCoder.getKeyCoder().verifyDeterministic();
} catch (NonDeterministicException e) {
runner.recordViewUsesNonDeterministicKeyCoder(this);
}
return input
.apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
.apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
.apply(View.CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
}
@Override
protected String getKindString() {
return "StreamingViewAsMultimap";
}
}
/**
* Specialized implementation for
* {@link com.google.cloud.dataflow.sdk.transforms.View.AsList View.AsList} for the
* Dataflow runner in streaming mode.
*/
private static class StreamingViewAsList<T>
extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public StreamingViewAsList(DataflowPipelineRunner runner, View.AsList<T> transform) {}
@Override
public PCollectionView<List<T>> apply(PCollection<T> input) {
PCollectionView<List<T>> view =
PCollectionViews.listView(
input.getPipeline(),
input.getWindowingStrategy(),
input.getCoder());
return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
.apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
.apply(View.CreatePCollectionView.<T, List<T>>of(view));
}
@Override
protected String getKindString() {
return "StreamingViewAsList";
}
}
/**
* Specialized implementation for
* {@link com.google.cloud.dataflow.sdk.transforms.View.AsIterable View.AsIterable} for the
* Dataflow runner in streaming mode.
*/
private static class StreamingViewAsIterable<T>
extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public StreamingViewAsIterable(DataflowPipelineRunner runner, View.AsIterable<T> transform) { }
@Override
public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
PCollectionView<Iterable<T>> view =
PCollectionViews.iterableView(
input.getPipeline(),
input.getWindowingStrategy(),
input.getCoder());
return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
.apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
.apply(View.CreatePCollectionView.<T, Iterable<T>>of(view));
}
@Override
protected String getKindString() {
return "StreamingViewAsIterable";
}
}
private static class WrapAsList<T> extends DoFn<T, List<T>> {
@Override
public void processElement(ProcessContext c) {
c.output(Arrays.asList(c.element()));
}
}
/**
* Specialized expansion for
* {@link com.google.cloud.dataflow.sdk.transforms.View.AsSingleton View.AsSingleton} for the
* Dataflow runner in streaming mode.
*/
private static class StreamingViewAsSingleton<T>
extends PTransform<PCollection<T>, PCollectionView<T>> {
private View.AsSingleton<T> transform;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public StreamingViewAsSingleton(DataflowPipelineRunner runner, View.AsSingleton<T> transform) {
this.transform = transform;
}
@Override
public PCollectionView<T> apply(PCollection<T> input) {
Combine.Globally<T, T> combine = Combine.globally(
new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
if (!transform.hasDefaultValue()) {
combine = combine.withoutDefaults();
}
return input.apply(combine.asSingletonView());
}
@Override
protected String getKindString() {
return "StreamingViewAsSingleton";
}
private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
private boolean hasDefaultValue;
private T defaultValue;
SingletonCombine(boolean hasDefaultValue, T defaultValue) {
this.hasDefaultValue = hasDefaultValue;
this.defaultValue = defaultValue;
}
@Override
public T apply(T left, T right) {
throw new IllegalArgumentException("PCollection with more than one element "
+ "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+ "combine the PCollection into a single value");
}
@Override
public T identity() {
if (hasDefaultValue) {
return defaultValue;
} else {
throw new IllegalArgumentException(
"Empty PCollection accessed as a singleton view. "
+ "Consider setting withDefault to provide a default value");
}
}
}
}
private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
Combine.GloballyAsSingletonView<InputT, OutputT> transform;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public StreamingCombineGloballyAsSingletonView(
DataflowPipelineRunner runner,
Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
this.transform = transform;
}
@Override
public PCollectionView<OutputT> apply(PCollection<InputT> input) {
PCollection<OutputT> combined =
input.apply(Combine.<InputT, OutputT>globally(transform.getCombineFn())
.withoutDefaults()
.withFanout(transform.getFanout()));
PCollectionView<OutputT> view = PCollectionViews.singletonView(
combined.getPipeline(),
combined.getWindowingStrategy(),
transform.getInsertDefault(),
transform.getInsertDefault()
? transform.getCombineFn().defaultValue() : null,
combined.getCoder());
return combined
.apply(ParDo.of(new WrapAsList<OutputT>()))
.apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, combined.getCoder())))
.apply(View.CreatePCollectionView.<OutputT, OutputT>of(view));
}
@Override
protected String getKindString() {
return "StreamingCombineGloballyAsSingletonView";
}
}
/**
* Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
*
* <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
* {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
* They require the input {@link PCollection} fits in memory.
* For a large {@link PCollection} this is expected to crash!
*
* @param <T> the type of elements to concatenate.
*/
private static class Concatenate<T> extends CombineFn<T, List<T>, List<T>> {
@Override
public List<T> createAccumulator() {
return new ArrayList<T>();
}
@Override
public List<T> addInput(List<T> accumulator, T input) {
accumulator.add(input);
return accumulator;
}
@Override
public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
List<T> result = createAccumulator();
for (List<T> accumulator : accumulators) {
result.addAll(accumulator);
}
return result;
}
@Override
public List<T> extractOutput(List<T> accumulator) {
return accumulator;
}
@Override
public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
return ListCoder.of(inputCoder);
}
@Override
public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
return ListCoder.of(inputCoder);
}
}
/**
* Specialized expansion for unsupported IO transforms and DoFns that throws an error.
*/
private static class UnsupportedIO<InputT extends PInput, OutputT extends POutput>
extends PTransform<InputT, OutputT> {
@Nullable
private PTransform<?, ?> transform;
@Nullable
private DoFn<?, ?> doFn;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public UnsupportedIO(DataflowPipelineRunner runner, AvroIO.Read.Bound<?> transform) {
this.transform = transform;
}
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public UnsupportedIO(DataflowPipelineRunner runner, BigQueryIO.Read.Bound transform) {
this.transform = transform;
}
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public UnsupportedIO(DataflowPipelineRunner runner, TextIO.Read.Bound<?> transform) {
this.transform = transform;
}
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public UnsupportedIO(DataflowPipelineRunner runner, Read.Bounded<?> transform) {
this.transform = transform;
}
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public UnsupportedIO(DataflowPipelineRunner runner, Read.Unbounded<?> transform) {
this.transform = transform;
}
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public UnsupportedIO(DataflowPipelineRunner runner, AvroIO.Write.Bound<?> transform) {
this.transform = transform;
}
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public UnsupportedIO(DataflowPipelineRunner runner, TextIO.Write.Bound<?> transform) {
this.transform = transform;
}
/**
* Builds an instance of this class from the overridden doFn.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public UnsupportedIO(DataflowPipelineRunner runner,
PubsubReader doFn) {
this.doFn = doFn;
}
/**
* Builds an instance of this class from the overridden doFn.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public UnsupportedIO(DataflowPipelineRunner runner,
PubsubWriter doFn) {
this.doFn = doFn;
}
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public UnsupportedIO(DataflowPipelineRunner runner, PubsubUnboundedSource<?> transform) {
this.transform = transform;
}
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public UnsupportedIO(DataflowPipelineRunner runner, PubsubUnboundedSink<?> transform) {
this.transform = transform;
}
@Override
public OutputT apply(InputT input) {
String mode = input.getPipeline().getOptions().as(StreamingOptions.class).isStreaming()
? "streaming" : "batch";
String name =
transform == null
? approximateSimpleName(doFn.getClass())
: approximatePTransformName(transform.getClass());
throw new UnsupportedOperationException(
String.format("The DataflowPipelineRunner in %s mode does not support %s.", mode, name));
}
}
@Override
public String toString() {
return "DataflowPipelineRunner#" + options.getJobName();
}
/**
* Attempts to detect all the resources the class loader has access to. This does not recurse
* to class loader parents stopping it from pulling in resources from the system class loader.
*
* @param classLoader The URLClassLoader to use to detect resources to stage.
* @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
* of the resources the class loader exposes is not a file resource.
* @return A list of absolute paths to the resources the class loader uses.
*/
protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
if (!(classLoader instanceof URLClassLoader)) {
String message = String.format("Unable to use ClassLoader to detect classpath elements. "
+ "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
LOG.error(message);
throw new IllegalArgumentException(message);
}
List<String> files = new ArrayList<>();
for (URL url : ((URLClassLoader) classLoader).getURLs()) {
try {
files.add(new File(url.toURI()).getAbsolutePath());
} catch (IllegalArgumentException | URISyntaxException e) {
String message = String.format("Unable to convert url (%s) to file.", url);
LOG.error(message);
throw new IllegalArgumentException(message, e);
}
}
return files;
}
/**
* Finds the id for the running job of the given name.
*/
private String getJobIdFromName(String jobName) {
try {
ListJobsResponse listResult;
String token = null;
do {
listResult = dataflowClient.projects().jobs()
.list(options.getProject())
.setPageToken(token)
.execute();
token = listResult.getNextPageToken();
for (Job job : listResult.getJobs()) {
if (job.getName().equals(jobName)
&& MonitoringUtil.toState(job.getCurrentState()).equals(State.RUNNING)) {
return job.getId();
}
}
} while (token != null);
} catch (GoogleJsonResponseException e) {
throw new RuntimeException(
"Got error while looking up jobs: "
+ (e.getDetails() != null ? e.getDetails().getMessage() : e), e);
} catch (IOException e) {
throw new RuntimeException("Got error while looking up jobs: ", e);
}
throw new IllegalArgumentException("Could not find running job named " + jobName);
}
}