blob: a56690c50ec617026ea337a3fc94d9c97e420443 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
import static org.apache.beam.sdk.util.Structs.addBoolean;
import static org.apache.beam.sdk.util.Structs.addDictionary;
import static org.apache.beam.sdk.util.Structs.addList;
import static org.apache.beam.sdk.util.Structs.addLong;
import static org.apache.beam.sdk.util.Structs.addObject;
import static org.apache.beam.sdk.util.Structs.addString;
import static org.apache.beam.sdk.util.Structs.getString;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.services.dataflow.model.AutoscalingSettings;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Disk;
import com.google.api.services.dataflow.model.Environment;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.Step;
import com.google.api.services.dataflow.model.WorkerPool;
import com.google.common.base.Supplier;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly;
import org.apache.beam.runners.dataflow.internal.ReadTranslator;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.DoFnInfo;
import org.apache.beam.runners.dataflow.util.OutputReference;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypedPValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link DataflowPipelineTranslator} knows how to translate {@link Pipeline} objects
* into Cloud Dataflow Service API {@link Job}s.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class DataflowPipelineTranslator {
// Must be kept in sync with their internal counterparts.
private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
/**
* A map from {@link PTransform} subclass to the corresponding
* {@link TransformTranslator} to use to translate that transform.
*
* <p>A static map that contains system-wide defaults.
*/
private static Map<Class, TransformTranslator> transformTranslators =
new HashMap<>();
/** Provided configuration options. */
private final DataflowPipelineOptions options;
/**
* Constructs a translator from the provided options.
*
* @param options Properties that configure the translator.
*
* @return The newly created translator.
*/
public static DataflowPipelineTranslator fromOptions(
DataflowPipelineOptions options) {
return new DataflowPipelineTranslator(options);
}
private DataflowPipelineTranslator(DataflowPipelineOptions options) {
this.options = options;
}
/**
* Translates a {@link Pipeline} into a {@code JobSpecification}.
*/
public JobSpecification translate(
Pipeline pipeline,
DataflowRunner runner,
List<DataflowPackage> packages) {
Translator translator = new Translator(pipeline, runner);
Job result = translator.translate(packages);
return new JobSpecification(result, Collections.unmodifiableMap(translator.stepNames));
}
/**
* The result of a job translation.
*
* <p>Used to pass the result {@link Job} and any state that was used to construct the job that
* may be of use to other classes (eg the {@link PTransform} to StepName mapping).
*/
public static class JobSpecification {
private final Job job;
private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
public JobSpecification(Job job, Map<AppliedPTransform<?, ?, ?>, String> stepNames) {
this.job = job;
this.stepNames = stepNames;
}
public Job getJob() {
return job;
}
/**
* Returns the mapping of {@link AppliedPTransform AppliedPTransforms} to the internal step
* name for that {@code AppliedPTransform}.
*/
public Map<AppliedPTransform<?, ?, ?>, String> getStepNames() {
return stepNames;
}
}
/**
* Renders a {@link Job} as a string.
*/
public static String jobToString(Job job) {
try {
return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(job);
} catch (JsonProcessingException exc) {
throw new IllegalStateException("Failed to render Job as String.", exc);
}
}
/////////////////////////////////////////////////////////////////////////////
/**
* Records that instances of the specified PTransform class
* should be translated by default by the corresponding
* {@link TransformTranslator}.
*/
public static <TransformT extends PTransform> void registerTransformTranslator(
Class<TransformT> transformClass,
TransformTranslator<? extends TransformT> transformTranslator) {
if (transformTranslators.put(transformClass, transformTranslator) != null) {
throw new IllegalArgumentException(
"defining multiple translators for " + transformClass);
}
}
/**
* Returns the {@link TransformTranslator} to use for instances of the
* specified PTransform class, or null if none registered.
*/
public <TransformT extends PTransform>
TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
return transformTranslators.get(transformClass);
}
/**
* A {@link TransformTranslator} knows how to translate
* a particular subclass of {@link PTransform} for the
* Cloud Dataflow service. It does so by
* mutating the {@link TranslationContext}.
*/
public interface TransformTranslator<TransformT extends PTransform> {
void translate(TransformT transform,
TranslationContext context);
}
/**
* The interface provided to registered callbacks for interacting
* with the {@link DataflowRunner}, including reading and writing the
* values of {@link PCollection}s and side inputs ({@link PCollectionView}s).
*/
public interface TranslationContext {
/**
* Returns the configured pipeline options.
*/
DataflowPipelineOptions getPipelineOptions();
/**
* Returns the input of the currently being translated transform.
*/
<InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
/**
* Returns the output of the currently being translated transform.
*/
<OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
/**
* Returns the full name of the currently being translated transform.
*/
String getFullName(PTransform<?, ?> transform);
/**
* Adds a step to the Dataflow workflow for the given transform, with
* the given Dataflow step type.
* This step becomes "current" for the purpose of {@link #addInput} and
* {@link #addOutput}.
*/
void addStep(PTransform<?, ?> transform, String type);
/**
* Adds a pre-defined step to the Dataflow workflow. The given PTransform should be
* consistent with the Step, in terms of input, output and coder types.
*
* <p>This is a low-level operation, when using this method it is up to
* the caller to ensure that names do not collide.
*/
void addStep(PTransform<?, ? extends PValue> transform, Step step);
/**
* Sets the encoding for the current Dataflow step.
*/
void addEncodingInput(Coder<?> value);
/**
* Adds an input with the given name and value to the current
* Dataflow step.
*/
void addInput(String name, Boolean value);
/**
* Adds an input with the given name and value to the current
* Dataflow step.
*/
void addInput(String name, String value);
/**
* Adds an input with the given name and value to the current
* Dataflow step.
*/
void addInput(String name, Long value);
/**
* Adds an input with the given name to the previously added Dataflow
* step, coming from the specified input PValue.
*/
void addInput(String name, PInput value);
/**
* Adds an input that is a dictionary of strings to objects.
*/
void addInput(String name, Map<String, Object> elements);
/**
* Adds an input that is a list of objects.
*/
void addInput(String name, List<? extends Map<String, Object>> elements);
/**
* Adds an output to the previously added Dataflow step,
* producing the specified output {@code PValue},
* including its {@code Coder} if a {@code TypedPValue}. If the
* {@code PValue} is a {@code PCollection}, wraps its coder inside
* a {@code WindowedValueCoder}. Returns a pipeline level unique id.
*/
long addOutput(PValue value);
/**
* Adds an output to the previously added Dataflow step,
* producing the specified output {@code PValue},
* including its {@code Coder} if a {@code TypedPValue}. If the
* {@code PValue} is a {@code PCollection}, wraps its coder inside
* a {@code ValueOnlyCoder}. Returns a pipeline level unique id.
*/
long addValueOnlyOutput(PValue value);
/**
* Adds an output to the previously added CollectionToSingleton Dataflow step,
* consuming the specified input {@code PValue} and producing the specified output
* {@code PValue}. This step requires special treatment for its
* output encoding. Returns a pipeline level unique id.
*/
long addCollectionToSingletonOutput(PValue inputValue,
PValue outputValue);
/**
* Encode a PValue reference as an output reference.
*/
OutputReference asOutputReference(PValue value);
}
/////////////////////////////////////////////////////////////////////////////
/**
* Translates a Pipeline into the Dataflow representation.
*/
class Translator extends PipelineVisitor.Defaults implements TranslationContext {
/**
* An id generator to be used when giving unique ids for pipeline level constructs.
* This is purposely wrapped inside of a {@link Supplier} to prevent the incorrect
* usage of the {@link AtomicLong} that is contained.
*/
private final Supplier<Long> idGenerator = new Supplier<Long>() {
private final AtomicLong generator = new AtomicLong(1L);
@Override
public Long get() {
return generator.getAndIncrement();
}
};
/** The Pipeline to translate. */
private final Pipeline pipeline;
/** The runner which will execute the pipeline. */
private final DataflowRunner runner;
/** The Cloud Dataflow Job representation. */
private final Job job = new Job();
/**
* Translator is stateful, as addProperty calls refer to the current step.
*/
private Step currentStep;
/**
* A Map from AppliedPTransform to their unique Dataflow step names.
*/
private final Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
/**
* A Map from PValues to their output names used by their producer
* Dataflow steps.
*/
private final Map<POutput, String> outputNames = new HashMap<>();
/**
* A Map from PValues to the Coders used for them.
*/
private final Map<POutput, Coder<?>> outputCoders = new HashMap<>();
/**
* The transform currently being applied.
*/
private AppliedPTransform<?, ?, ?> currentTransform;
/**
* Constructs a Translator that will translate the specified
* Pipeline into Dataflow objects.
*/
public Translator(Pipeline pipeline, DataflowRunner runner) {
this.pipeline = pipeline;
this.runner = runner;
}
/**
* Translates this Translator's pipeline onto its writer.
* @return a Job definition filled in with the type of job, the environment,
* and the job steps.
*/
public Job translate(List<DataflowPackage> packages) {
job.setName(options.getJobName().toLowerCase());
Environment environment = new Environment();
job.setEnvironment(environment);
try {
environment.setSdkPipelineOptions(
MAPPER.readValue(MAPPER.writeValueAsBytes(options), Map.class));
} catch (IOException e) {
throw new IllegalArgumentException(
"PipelineOptions specified failed to serialize to JSON.", e);
}
WorkerPool workerPool = new WorkerPool();
if (options.isStreaming()) {
job.setType("JOB_TYPE_STREAMING");
} else {
job.setType("JOB_TYPE_BATCH");
workerPool.setDiskType(options.getWorkerDiskType());
}
if (options.getWorkerMachineType() != null) {
workerPool.setMachineType(options.getWorkerMachineType());
}
if (options.getUsePublicIps() != null) {
if (options.getUsePublicIps()) {
workerPool.setIpConfiguration("WORKER_IP_PUBLIC");
} else {
workerPool.setIpConfiguration("WORKER_IP_PRIVATE");
}
}
workerPool.setPackages(packages);
workerPool.setNumWorkers(options.getNumWorkers());
if (options.isStreaming()
&& (options.getExperiments() == null
|| !options.getExperiments().contains("enable_windmill_service"))) {
// Use separate data disk for streaming.
Disk disk = new Disk();
disk.setDiskType(options.getWorkerDiskType());
workerPool.setDataDisks(Collections.singletonList(disk));
}
if (!isNullOrEmpty(options.getZone())) {
workerPool.setZone(options.getZone());
}
if (!isNullOrEmpty(options.getNetwork())) {
workerPool.setNetwork(options.getNetwork());
}
if (!isNullOrEmpty(options.getSubnetwork())) {
workerPool.setSubnetwork(options.getSubnetwork());
}
if (options.getDiskSizeGb() > 0) {
workerPool.setDiskSizeGb(options.getDiskSizeGb());
}
AutoscalingSettings settings = new AutoscalingSettings();
if (options.getAutoscalingAlgorithm() != null) {
settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm());
}
settings.setMaxNumWorkers(options.getMaxNumWorkers());
workerPool.setAutoscalingSettings(settings);
List<WorkerPool> workerPools = new LinkedList<>();
workerPools.add(workerPool);
environment.setWorkerPools(workerPools);
if (options.getServiceAccount() != null) {
environment.setServiceAccountEmail(options.getServiceAccount());
}
pipeline.traverseTopologically(this);
return job;
}
@Override
public DataflowPipelineOptions getPipelineOptions() {
return options;
}
@Override
public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
return (InputT) getCurrentTransform(transform).getInput();
}
@Override
public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
return (OutputT) getCurrentTransform(transform).getOutput();
}
@Override
public String getFullName(PTransform<?, ?> transform) {
return getCurrentTransform(transform).getFullName();
}
private AppliedPTransform<?, ?, ?> getCurrentTransform(PTransform<?, ?> transform) {
checkArgument(
currentTransform != null && currentTransform.getTransform() == transform,
"can only be called with current transform");
return currentTransform;
}
@Override
public void leaveCompositeTransform(TransformHierarchy.Node node) {
}
@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
PTransform<?, ?> transform = node.getTransform();
TransformTranslator translator =
getTransformTranslator(transform.getClass());
if (translator == null) {
throw new IllegalStateException(
"no translator registered for " + transform);
}
LOG.debug("Translating {}", transform);
currentTransform = node.toAppliedPTransform();
translator.translate(transform, this);
currentTransform = null;
}
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {
LOG.debug("Checking translation of {}", value);
if (value.getProducingTransformInternal() == null) {
throw new RuntimeException(
"internal error: expecting a PValue "
+ "to have a producingTransform");
}
if (!producer.isCompositeNode()) {
// Primitive transforms are the only ones assigned step names.
asOutputReference(value);
}
}
@Override
public void addStep(PTransform<?, ?> transform, String type) {
String stepName = genStepName();
if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
throw new IllegalArgumentException(
transform + " already has a name specified");
}
// Start the next "steps" list item.
List<Step> steps = job.getSteps();
if (steps == null) {
steps = new LinkedList<>();
job.setSteps(steps);
}
currentStep = new Step();
currentStep.setName(stepName);
currentStep.setKind(type);
steps.add(currentStep);
addInput(PropertyNames.USER_NAME, getFullName(transform));
addDisplayData(stepName, transform);
}
@Override
public void addStep(PTransform<?, ? extends PValue> transform, Step original) {
Step step = original.clone();
String stepName = step.getName();
if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
throw new IllegalArgumentException(transform + " already has a name specified");
}
Map<String, Object> properties = step.getProperties();
if (properties != null) {
@Nullable List<Map<String, Object>> outputInfoList = null;
try {
// TODO: This should be done via a Structs accessor.
@Nullable List<Map<String, Object>> list =
(List<Map<String, Object>>) properties.get(PropertyNames.OUTPUT_INFO);
outputInfoList = list;
} catch (Exception e) {
throw new RuntimeException("Inconsistent dataflow pipeline translation", e);
}
if (outputInfoList != null && outputInfoList.size() > 0) {
Map<String, Object> firstOutputPort = outputInfoList.get(0);
@Nullable String name;
try {
name = getString(firstOutputPort, PropertyNames.OUTPUT_NAME);
} catch (Exception e) {
name = null;
}
if (name != null) {
registerOutputName(getOutput(transform), name);
}
}
}
List<Step> steps = job.getSteps();
if (steps == null) {
steps = new LinkedList<>();
job.setSteps(steps);
}
currentStep = step;
steps.add(step);
}
@Override
public void addEncodingInput(Coder<?> coder) {
CloudObject encoding = SerializableUtils.ensureSerializable(coder);
addObject(getProperties(), PropertyNames.ENCODING, encoding);
}
@Override
public void addInput(String name, Boolean value) {
addBoolean(getProperties(), name, value);
}
@Override
public void addInput(String name, String value) {
addString(getProperties(), name, value);
}
@Override
public void addInput(String name, Long value) {
addLong(getProperties(), name, value);
}
@Override
public void addInput(String name, Map<String, Object> elements) {
addDictionary(getProperties(), name, elements);
}
@Override
public void addInput(String name, List<? extends Map<String, Object>> elements) {
addList(getProperties(), name, elements);
}
@Override
public void addInput(String name, PInput value) {
if (value instanceof PValue) {
addInput(name, asOutputReference((PValue) value));
} else {
throw new IllegalStateException("Input must be a PValue");
}
}
@Override
public long addOutput(PValue value) {
Coder<?> coder;
if (value instanceof TypedPValue) {
coder = ((TypedPValue<?>) value).getCoder();
if (value instanceof PCollection) {
// Wrap the PCollection element Coder inside a WindowedValueCoder.
coder = WindowedValue.getFullCoder(
coder,
((PCollection<?>) value).getWindowingStrategy().getWindowFn().windowCoder());
}
} else {
// No output coder to encode.
coder = null;
}
return addOutput(value, coder);
}
@Override
public long addValueOnlyOutput(PValue value) {
Coder<?> coder;
if (value instanceof TypedPValue) {
coder = ((TypedPValue<?>) value).getCoder();
if (value instanceof PCollection) {
// Wrap the PCollection element Coder inside a ValueOnly
// WindowedValueCoder.
coder = WindowedValue.getValueOnlyCoder(coder);
}
} else {
// No output coder to encode.
coder = null;
}
return addOutput(value, coder);
}
@Override
public long addCollectionToSingletonOutput(PValue inputValue,
PValue outputValue) {
Coder<?> inputValueCoder =
checkNotNull(outputCoders.get(inputValue));
// The inputValueCoder for the input PCollection should be some
// WindowedValueCoder of the input PCollection's element
// coder.
checkState(
inputValueCoder instanceof WindowedValue.WindowedValueCoder);
// The outputValueCoder for the output should be an
// IterableCoder of the inputValueCoder. This is a property
// of the backend "CollectionToSingleton" step.
Coder<?> outputValueCoder = IterableCoder.of(inputValueCoder);
return addOutput(outputValue, outputValueCoder);
}
/**
* Adds an output with the given name to the previously added
* Dataflow step, producing the specified output {@code PValue}
* with the given {@code Coder} (if not {@code null}).
*/
private long addOutput(PValue value, Coder<?> valueCoder) {
long id = idGenerator.get();
registerOutputName(value, Long.toString(id));
Map<String, Object> properties = getProperties();
@Nullable List<Map<String, Object>> outputInfoList = null;
try {
// TODO: This should be done via a Structs accessor.
outputInfoList = (List<Map<String, Object>>) properties.get(PropertyNames.OUTPUT_INFO);
} catch (Exception e) {
throw new RuntimeException("Inconsistent dataflow pipeline translation", e);
}
if (outputInfoList == null) {
outputInfoList = new ArrayList<>();
// TODO: This should be done via a Structs accessor.
properties.put(PropertyNames.OUTPUT_INFO, outputInfoList);
}
Map<String, Object> outputInfo = new HashMap<>();
addString(outputInfo, PropertyNames.OUTPUT_NAME, Long.toString(id));
addString(outputInfo, PropertyNames.USER_NAME, value.getName());
if (value instanceof PCollection
&& runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true);
}
if (valueCoder != null) {
// Verify that encoding can be decoded, in order to catch serialization
// failures as early as possible.
CloudObject encoding = SerializableUtils.ensureSerializable(valueCoder);
addObject(outputInfo, PropertyNames.ENCODING, encoding);
outputCoders.put(value, valueCoder);
}
outputInfoList.add(outputInfo);
return id;
}
private void addDisplayData(String stepName, HasDisplayData hasDisplayData) {
DisplayData displayData = DisplayData.from(hasDisplayData);
List<Map<String, Object>> list = MAPPER.convertValue(displayData, List.class);
addList(getProperties(), PropertyNames.DISPLAY_DATA, list);
}
@Override
public OutputReference asOutputReference(PValue value) {
AppliedPTransform<?, ?, ?> transform =
value.getProducingTransformInternal();
String stepName = stepNames.get(transform);
if (stepName == null) {
throw new IllegalArgumentException(transform + " doesn't have a name specified");
}
String outputName = outputNames.get(value);
if (outputName == null) {
throw new IllegalArgumentException(
"output " + value + " doesn't have a name specified");
}
return new OutputReference(stepName, outputName);
}
private Map<String, Object> getProperties() {
Map<String, Object> properties = currentStep.getProperties();
if (properties == null) {
properties = new HashMap<>();
currentStep.setProperties(properties);
}
return properties;
}
/**
* Returns a fresh Dataflow step name.
*/
private String genStepName() {
return "s" + (stepNames.size() + 1);
}
/**
* Records the name of the given output PValue,
* within its producing transform.
*/
private void registerOutputName(POutput value, String name) {
if (outputNames.put(value, name) != null) {
throw new IllegalArgumentException(
"output " + value + " already has a name specified");
}
}
}
/////////////////////////////////////////////////////////////////////////////
@Override
public String toString() {
return "DataflowPipelineTranslator#" + hashCode();
}
///////////////////////////////////////////////////////////////////////////
static {
registerTransformTranslator(
View.CreatePCollectionView.class,
new TransformTranslator<View.CreatePCollectionView>() {
@Override
public void translate(
View.CreatePCollectionView transform,
TranslationContext context) {
translateTyped(transform, context);
}
private <ElemT, ViewT> void translateTyped(
View.CreatePCollectionView<ElemT, ViewT> transform,
TranslationContext context) {
context.addStep(transform, "CollectionToSingleton");
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
context.addCollectionToSingletonOutput(
context.getInput(transform),
context.getOutput(transform));
}
});
DataflowPipelineTranslator.registerTransformTranslator(
Combine.GroupedValues.class,
new DataflowPipelineTranslator.TransformTranslator<Combine.GroupedValues>() {
@Override
public void translate(
Combine.GroupedValues transform,
DataflowPipelineTranslator.TranslationContext context) {
translateHelper(transform, context);
}
private <K, InputT, OutputT> void translateHelper(
final Combine.GroupedValues<K, InputT, OutputT> transform,
DataflowPipelineTranslator.TranslationContext context) {
context.addStep(transform, "CombineValues");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
transform.getAppliedFn(
context.getInput(transform).getPipeline().getCoderRegistry(),
context.getInput(transform).getCoder(),
context.getInput(transform).getWindowingStrategy());
context.addEncodingInput(fn.getAccumulatorCoder());
context.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(serializeToByteArray(fn)));
context.addOutput(context.getOutput(transform));
}
});
registerTransformTranslator(
Flatten.FlattenPCollectionList.class,
new TransformTranslator<Flatten.FlattenPCollectionList>() {
@Override
public void translate(
Flatten.FlattenPCollectionList transform,
TranslationContext context) {
flattenHelper(transform, context);
}
private <T> void flattenHelper(
Flatten.FlattenPCollectionList<T> transform,
TranslationContext context) {
context.addStep(transform, "Flatten");
List<OutputReference> inputs = new LinkedList<>();
for (PCollection<T> input : context.getInput(transform).getAll()) {
inputs.add(context.asOutputReference(input));
}
context.addInput(PropertyNames.INPUTS, inputs);
context.addOutput(context.getOutput(transform));
}
});
registerTransformTranslator(
GroupByKeyAndSortValuesOnly.class,
new TransformTranslator<GroupByKeyAndSortValuesOnly>() {
@Override
public void translate(
GroupByKeyAndSortValuesOnly transform,
TranslationContext context) {
groupByKeyAndSortValuesHelper(transform, context);
}
private <K1, K2, V> void groupByKeyAndSortValuesHelper(
GroupByKeyAndSortValuesOnly<K1, K2, V> transform,
TranslationContext context) {
context.addStep(transform, "GroupByKey");
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
context.addOutput(context.getOutput(transform));
context.addInput(PropertyNames.SORT_VALUES, true);
// TODO: Add support for combiner lifting once the need arises.
context.addInput(
PropertyNames.DISALLOW_COMBINER_LIFTING, true);
}
});
registerTransformTranslator(
GroupByKey.class,
new TransformTranslator<GroupByKey>() {
@Override
public void translate(
GroupByKey transform,
TranslationContext context) {
groupByKeyHelper(transform, context);
}
private <K, V> void groupByKeyHelper(
GroupByKey<K, V> transform,
TranslationContext context) {
context.addStep(transform, "GroupByKey");
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
context.addOutput(context.getOutput(transform));
WindowingStrategy<?, ?> windowingStrategy =
context.getInput(transform).getWindowingStrategy();
boolean isStreaming =
context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
boolean disallowCombinerLifting =
!windowingStrategy.getWindowFn().isNonMerging()
|| (isStreaming && !transform.fewKeys())
// TODO: Allow combiner lifting on the non-default trigger, as appropriate.
|| !(windowingStrategy.getTrigger() instanceof DefaultTrigger);
context.addInput(
PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting);
context.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
context.addInput(
PropertyNames.IS_MERGING_WINDOW_FN,
!windowingStrategy.getWindowFn().isNonMerging());
}
});
registerTransformTranslator(
ParDo.BoundMulti.class,
new TransformTranslator<ParDo.BoundMulti>() {
@Override
public void translate(
ParDo.BoundMulti transform,
TranslationContext context) {
translateMultiHelper(transform, context);
}
private <InputT, OutputT> void translateMultiHelper(
ParDo.BoundMulti<InputT, OutputT> transform,
TranslationContext context) {
rejectStatefulDoFn(transform.getNewFn());
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
BiMap<Long, TupleTag<?>> outputMap =
translateOutputs(context.getOutput(transform), context);
translateFn(
transform.getNewFn(),
context.getInput(transform).getWindowingStrategy(),
transform.getSideInputs(),
context.getInput(transform).getCoder(),
context,
outputMap.inverse().get(transform.getMainOutputTag()),
outputMap);
}
});
registerTransformTranslator(
ParDo.Bound.class,
new TransformTranslator<ParDo.Bound>() {
@Override
public void translate(
ParDo.Bound transform,
TranslationContext context) {
translateSingleHelper(transform, context);
}
private <InputT, OutputT> void translateSingleHelper(
ParDo.Bound<InputT, OutputT> transform,
TranslationContext context) {
rejectStatefulDoFn(transform.getNewFn());
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
long mainOutput = context.addOutput(context.getOutput(transform));
translateFn(
transform.getNewFn(),
context.getInput(transform).getWindowingStrategy(),
transform.getSideInputs(),
context.getInput(transform).getCoder(),
context,
mainOutput,
ImmutableMap.<Long, TupleTag<?>>of(mainOutput,
new TupleTag<>(PropertyNames.OUTPUT)));
}
});
registerTransformTranslator(
Window.Bound.class,
new DataflowPipelineTranslator.TransformTranslator<Window.Bound>() {
@Override
public void translate(
Window.Bound transform, TranslationContext context) {
translateHelper(transform, context);
}
private <T> void translateHelper(
Window.Bound<T> transform, TranslationContext context) {
context.addStep(transform, "Bucket");
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
context.addOutput(context.getOutput(transform));
WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
byte[] serializedBytes = serializeToByteArray(strategy);
String serializedJson = byteArrayToJsonString(serializedBytes);
assert Arrays.equals(serializedBytes,
jsonStringToByteArray(serializedJson));
context.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
}
});
///////////////////////////////////////////////////////////////////////////
// IO Translation.
registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
}
private static void rejectStatefulDoFn(DoFn<?, ?> doFn) {
if (DoFnSignatures.getSignature(doFn.getClass()).isStateful()) {
throw new UnsupportedOperationException(
String.format(
"Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
DoFn.StateId.class.getSimpleName(),
doFn.getClass().getName(),
DoFn.class.getSimpleName(),
DataflowRunner.class.getSimpleName()));
}
}
private static void translateInputs(
PCollection<?> input,
List<PCollectionView<?>> sideInputs,
TranslationContext context) {
context.addInput(PropertyNames.PARALLEL_INPUT, input);
translateSideInputs(sideInputs, context);
}
// Used for ParDo
private static void translateSideInputs(
List<PCollectionView<?>> sideInputs,
TranslationContext context) {
Map<String, Object> nonParInputs = new HashMap<>();
for (PCollectionView<?> view : sideInputs) {
nonParInputs.put(
view.getTagInternal().getId(),
context.asOutputReference(view));
}
context.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs);
}
private static void translateFn(
DoFn fn,
WindowingStrategy windowingStrategy,
Iterable<PCollectionView<?>> sideInputs,
Coder inputCoder,
TranslationContext context,
long mainOutput,
Map<Long, TupleTag<?>> outputMap) {
context.addInput(PropertyNames.USER_FN, fn.getClass().getName());
context.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(
serializeToByteArray(
DoFnInfo.forFn(
fn, windowingStrategy, sideInputs, inputCoder, mainOutput, outputMap))));
}
private static BiMap<Long, TupleTag<?>> translateOutputs(
PCollectionTuple outputs,
TranslationContext context) {
ImmutableBiMap.Builder<Long, TupleTag<?>> mapBuilder = ImmutableBiMap.builder();
for (Map.Entry<TupleTag<?>, PCollection<?>> entry
: outputs.getAll().entrySet()) {
TupleTag<?> tag = entry.getKey();
PCollection<?> output = entry.getValue();
mapBuilder.put(context.addOutput(output), tag);
}
return mapBuilder.build();
}
}