blob: 45e0c33d12e35afa376ca95e3d0a1772bc22ed3b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow;
import static org.apache.beam.runners.dataflow.util.Structs.addBoolean;
import static org.apache.beam.runners.dataflow.util.Structs.addDictionary;
import static org.apache.beam.runners.dataflow.util.Structs.addList;
import static org.apache.beam.runners.dataflow.util.Structs.addLong;
import static org.apache.beam.runners.dataflow.util.Structs.addObject;
import static org.apache.beam.runners.dataflow.util.Structs.addString;
import static org.apache.beam.runners.dataflow.util.Structs.getString;
import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings.isNullOrEmpty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.services.dataflow.model.AutoscalingSettings;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Disk;
import com.google.api.services.dataflow.model.Environment;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.Step;
import com.google.api.services.dataflow.model.WorkerPool;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly;
import org.apache.beam.runners.dataflow.DataflowRunner.CombineGroupedValues;
import org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle;
import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.OutputReference;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.DoFnInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link DataflowPipelineTranslator} knows how to translate {@link Pipeline} objects into Cloud
* Dataflow Service API {@link Job}s.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
@VisibleForTesting
public class DataflowPipelineTranslator {
// Must be kept in sync with their internal counterparts.
private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static byte[] serializeWindowingStrategy(
WindowingStrategy<?, ?> windowingStrategy, PipelineOptions options) {
try {
SdkComponents sdkComponents = SdkComponents.create();
String workerHarnessContainerImageURL =
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
RunnerApi.Environment defaultEnvironmentForDataflow =
Environments.createDockerEnvironment(workerHarnessContainerImageURL);
sdkComponents.registerEnvironment(defaultEnvironmentForDataflow);
return WindowingStrategyTranslation.toMessageProto(windowingStrategy, sdkComponents)
.toByteArray();
} catch (Exception e) {
throw new RuntimeException(
String.format("Unable to format windowing strategy %s as bytes", windowingStrategy), e);
}
}
/**
* A map from {@link PTransform} subclass to the corresponding {@link TransformTranslator} to use
* to translate that transform.
*
* <p>A static map that contains system-wide defaults.
*/
private static Map<Class, TransformTranslator> transformTranslators = new HashMap<>();
/** Provided configuration options. */
private final DataflowPipelineOptions options;
/**
* Constructs a translator from the provided options.
*
* @param options Properties that configure the translator.
* @return The newly created translator.
*/
public static DataflowPipelineTranslator fromOptions(DataflowPipelineOptions options) {
return new DataflowPipelineTranslator(options);
}
private DataflowPipelineTranslator(DataflowPipelineOptions options) {
this.options = options;
}
/** Translates a {@link Pipeline} into a {@code JobSpecification}. */
public JobSpecification translate(
Pipeline pipeline, DataflowRunner runner, List<DataflowPackage> packages) {
// Capture the sdkComponents for look up during step translations
SdkComponents sdkComponents = SdkComponents.create();
String workerHarnessContainerImageURL =
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
RunnerApi.Environment defaultEnvironmentForDataflow =
Environments.createDockerEnvironment(workerHarnessContainerImageURL);
sdkComponents.registerEnvironment(defaultEnvironmentForDataflow);
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
Translator translator = new Translator(pipeline, runner, sdkComponents);
Job result = translator.translate(packages);
return new JobSpecification(
result, pipelineProto, Collections.unmodifiableMap(translator.stepNames));
}
/**
* The result of a job translation.
*
* <p>Used to pass the result {@link Job} and any state that was used to construct the job that
* may be of use to other classes (eg the {@link PTransform} to StepName mapping).
*/
public static class JobSpecification {
private final Job job;
private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
private final RunnerApi.Pipeline pipelineProto;
public JobSpecification(
Job job,
RunnerApi.Pipeline pipelineProto,
Map<AppliedPTransform<?, ?, ?>, String> stepNames) {
this.job = job;
this.pipelineProto = pipelineProto;
this.stepNames = stepNames;
}
public Job getJob() {
return job;
}
public RunnerApi.Pipeline getPipelineProto() {
return pipelineProto;
}
/**
* Returns the mapping of {@link AppliedPTransform AppliedPTransforms} to the internal step name
* for that {@code AppliedPTransform}.
*/
public Map<AppliedPTransform<?, ?, ?>, String> getStepNames() {
return stepNames;
}
}
/** Renders a {@link Job} as a string. */
public static String jobToString(Job job) {
try {
return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(job);
} catch (JsonProcessingException exc) {
throw new IllegalStateException("Failed to render Job as String.", exc);
}
}
/////////////////////////////////////////////////////////////////////////////
/**
* Records that instances of the specified PTransform class should be translated by default by the
* corresponding {@link TransformTranslator}.
*/
public static <TransformT extends PTransform> void registerTransformTranslator(
Class<TransformT> transformClass,
TransformTranslator<? extends TransformT> transformTranslator) {
if (transformTranslators.put(transformClass, transformTranslator) != null) {
throw new IllegalArgumentException("defining multiple translators for " + transformClass);
}
}
/**
* Returns the {@link TransformTranslator} to use for instances of the specified PTransform class,
* or null if none registered.
*/
public <TransformT extends PTransform> TransformTranslator<TransformT> getTransformTranslator(
Class<TransformT> transformClass) {
return transformTranslators.get(transformClass);
}
/////////////////////////////////////////////////////////////////////////////
/**
* Translates a Pipeline into the Dataflow representation.
*
* <p>For internal use only.
*/
class Translator extends PipelineVisitor.Defaults implements TranslationContext {
/**
* An id generator to be used when giving unique ids for pipeline level constructs. This is
* purposely wrapped inside of a {@link Supplier} to prevent the incorrect usage of the {@link
* AtomicLong} that is contained.
*/
private final Supplier<Long> idGenerator =
new Supplier<Long>() {
private final AtomicLong generator = new AtomicLong(1L);
@Override
public Long get() {
return generator.getAndIncrement();
}
};
/** The Pipeline to translate. */
private final Pipeline pipeline;
/** The runner which will execute the pipeline. */
private final DataflowRunner runner;
/** The Cloud Dataflow Job representation. */
private final Job job = new Job();
/** A Map from AppliedPTransform to their unique Dataflow step names. */
private final Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
/**
* A Map from {@link PValue} to the {@link AppliedPTransform} that produces that {@link PValue}.
*/
private final Map<PValue, AppliedPTransform<?, ?, ?>> producers = new HashMap<>();
/** A Map from PValues to their output names used by their producer Dataflow steps. */
private final Map<PValue, String> outputNames = new HashMap<>();
/** A Map from PValues to the Coders used for them. */
private final Map<PValue, Coder<?>> outputCoders = new HashMap<>();
/**
* The component maps of the portable pipeline, so they can be referred to by id in the output
* of translation.
*/
private final SdkComponents sdkComponents;
/** The transform currently being applied. */
private AppliedPTransform<?, ?, ?> currentTransform;
/** A stack of all composite PTransforms encompassing the current transform. */
private ArrayDeque<TransformHierarchy.Node> parents = new ArrayDeque<>();
/** Constructs a Translator that will translate the specified Pipeline into Dataflow objects. */
public Translator(Pipeline pipeline, DataflowRunner runner, SdkComponents sdkComponents) {
this.pipeline = pipeline;
this.runner = runner;
this.sdkComponents = sdkComponents;
}
/**
* Translates this Translator's pipeline onto its writer.
*
* @return a Job definition filled in with the type of job, the environment, and the job steps.
*/
public Job translate(List<DataflowPackage> packages) {
job.setName(options.getJobName().toLowerCase());
Environment environment = new Environment();
job.setEnvironment(environment);
WorkerPool workerPool = new WorkerPool();
// If streaming engine is enabled set the proper experiments so that it is enabled on the
// back end as well. If streaming engine is not enabled make sure the experiments are also
// not enabled.
if (options.isEnableStreamingEngine()) {
List<String> experiments = options.getExperiments();
if (experiments == null) {
experiments = new ArrayList<String>();
} else {
experiments = new ArrayList<String>(experiments);
}
if (!experiments.contains(GcpOptions.STREAMING_ENGINE_EXPERIMENT)) {
experiments.add(GcpOptions.STREAMING_ENGINE_EXPERIMENT);
}
if (!experiments.contains(GcpOptions.WINDMILL_SERVICE_EXPERIMENT)) {
experiments.add(GcpOptions.WINDMILL_SERVICE_EXPERIMENT);
}
options.setExperiments(experiments);
} else {
List<String> experiments = options.getExperiments();
if (experiments != null) {
if (experiments.contains(GcpOptions.STREAMING_ENGINE_EXPERIMENT)
|| experiments.contains(GcpOptions.WINDMILL_SERVICE_EXPERIMENT)) {
throw new IllegalArgumentException(
"Streaming engine both disabled and enabled: enableStreamingEngine is set to false, but enable_windmill_service and/or enable_streaming_engine are present. It is recommended you only set enableStreamingEngine.");
}
}
}
if (options.isStreaming()) {
job.setType("JOB_TYPE_STREAMING");
} else {
job.setType("JOB_TYPE_BATCH");
workerPool.setDiskType(options.getWorkerDiskType());
}
if (options.getWorkerMachineType() != null) {
workerPool.setMachineType(options.getWorkerMachineType());
}
if (options.getUsePublicIps() != null) {
if (options.getUsePublicIps()) {
workerPool.setIpConfiguration("WORKER_IP_PUBLIC");
} else {
workerPool.setIpConfiguration("WORKER_IP_PRIVATE");
}
}
workerPool.setPackages(packages);
workerPool.setNumWorkers(options.getNumWorkers());
if (options.getLabels() != null) {
job.setLabels(options.getLabels());
}
if (options.isStreaming() && !hasExperiment(options, "enable_windmill_service")) {
// Use separate data disk for streaming.
Disk disk = new Disk();
disk.setDiskType(options.getWorkerDiskType());
workerPool.setDataDisks(Collections.singletonList(disk));
}
if (!isNullOrEmpty(options.getZone())) {
workerPool.setZone(options.getZone());
}
if (!isNullOrEmpty(options.getNetwork())) {
workerPool.setNetwork(options.getNetwork());
}
if (!isNullOrEmpty(options.getSubnetwork())) {
workerPool.setSubnetwork(options.getSubnetwork());
}
if (options.getDiskSizeGb() > 0) {
workerPool.setDiskSizeGb(options.getDiskSizeGb());
}
AutoscalingSettings settings = new AutoscalingSettings();
if (options.getAutoscalingAlgorithm() != null) {
settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm());
}
settings.setMaxNumWorkers(options.getMaxNumWorkers());
workerPool.setAutoscalingSettings(settings);
List<WorkerPool> workerPools = new ArrayList<>();
workerPools.add(workerPool);
environment.setWorkerPools(workerPools);
if (options.getServiceAccount() != null) {
environment.setServiceAccountEmail(options.getServiceAccount());
}
if (options.getDataflowKmsKey() != null) {
environment.setServiceKmsKeyName(options.getDataflowKmsKey());
}
pipeline.traverseTopologically(this);
return job;
}
@Override
public DataflowPipelineOptions getPipelineOptions() {
return options;
}
@Override
public <InputT extends PInput> Map<TupleTag<?>, PValue> getInputs(
PTransform<InputT, ?> transform) {
return getCurrentTransform(transform).getInputs();
}
@Override
public <InputT extends PValue> InputT getInput(PTransform<InputT, ?> transform) {
return (InputT)
Iterables.getOnlyElement(
TransformInputs.nonAdditionalInputs(getCurrentTransform(transform)));
}
@Override
public <OutputT extends POutput> Map<TupleTag<?>, PValue> getOutputs(
PTransform<?, OutputT> transform) {
return getCurrentTransform(transform).getOutputs();
}
@Override
public <OutputT extends PValue> OutputT getOutput(PTransform<?, OutputT> transform) {
return (OutputT) Iterables.getOnlyElement(getOutputs(transform).values());
}
@Override
public String getFullName(PTransform<?, ?> transform) {
return getCurrentTransform(transform).getFullName();
}
@Override
public AppliedPTransform<?, ?, ?> getCurrentTransform() {
return currentTransform;
}
private AppliedPTransform<?, ?, ?> getCurrentTransform(PTransform<?, ?> transform) {
checkArgument(
currentTransform != null && currentTransform.getTransform() == transform,
"can only be called with current transform");
return currentTransform;
}
@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
PTransform<?, ?> transform = node.getTransform();
TransformTranslator translator = getTransformTranslator(transform.getClass());
checkState(
translator != null,
"no translator registered for primitive transform %s at node %s",
transform,
node.getFullName());
LOG.debug("Translating {}", transform);
currentTransform = node.toAppliedPTransform(getPipeline());
translator.translate(transform, this);
currentTransform = null;
}
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {
LOG.debug("Checking translation of {}", value);
// Primitive transforms are the only ones assigned step names.
if (producer.getTransform() instanceof CreateDataflowView
&& !hasExperiment(options, "beam_fn_api")) {
// CreateDataflowView produces a dummy output (as it must be a primitive transform)
// but in the Dataflow Job graph produces only the view and not the output PCollection.
asOutputReference(
((CreateDataflowView) producer.getTransform()).getView(),
producer.toAppliedPTransform(getPipeline()));
return;
} else if (producer.getTransform() instanceof View.CreatePCollectionView
&& hasExperiment(options, "beam_fn_api")) {
// View.CreatePCollectionView produces a dummy output (as it must be a primitive transform)
// but in the Dataflow Job graph produces only the view and not the output PCollection.
asOutputReference(
((View.CreatePCollectionView) producer.getTransform()).getView(),
producer.toAppliedPTransform(getPipeline()));
return;
}
asOutputReference(value, producer.toAppliedPTransform(getPipeline()));
}
@Override
public StepTranslator addStep(PTransform<?, ?> transform, String type) {
String stepName = genStepName();
if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
throw new IllegalArgumentException(transform + " already has a name specified");
}
// Start the next "steps" list item.
List<Step> steps = job.getSteps();
if (steps == null) {
steps = new ArrayList<>();
job.setSteps(steps);
}
Step step = new Step();
step.setName(stepName);
step.setKind(type);
steps.add(step);
StepTranslator stepContext = new StepTranslator(this, step);
stepContext.addInput(PropertyNames.USER_NAME, getFullName(transform));
stepContext.addDisplayData(step, stepName, transform);
LOG.info("Adding {} as step {}", getCurrentTransform(transform).getFullName(), stepName);
return stepContext;
}
@Override
public OutputReference asOutputReference(PValue value, AppliedPTransform<?, ?, ?> producer) {
String stepName = stepNames.get(producer);
checkArgument(stepName != null, "%s doesn't have a name specified", producer);
String outputName = outputNames.get(value);
checkArgument(outputName != null, "output %s doesn't have a name specified", value);
return new OutputReference(stepName, outputName);
}
@Override
public SdkComponents getSdkComponents() {
return sdkComponents;
}
@Override
public AppliedPTransform<?, ?, ?> getProducer(PValue value) {
return checkNotNull(
producers.get(value),
"Unknown producer for value %s while translating step %s",
value,
currentTransform.getFullName());
}
/** Returns a fresh Dataflow step name. */
private String genStepName() {
return "s" + (stepNames.size() + 1);
}
/** Records the name of the given output PValue, within its producing transform. */
private void registerOutputName(PValue value, String name) {
if (outputNames.put(value, name) != null) {
throw new IllegalArgumentException("output " + value + " already has a name specified");
}
}
@Override
public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
if (!node.isRootNode()) {
parents.addFirst(node);
}
return CompositeBehavior.ENTER_TRANSFORM;
}
@Override
public void leaveCompositeTransform(TransformHierarchy.Node node) {
if (!node.isRootNode()) {
parents.removeFirst();
}
}
@Override
public AppliedPTransform<?, ?, ?> getCurrentParent() {
if (parents.isEmpty()) {
return null;
} else {
return parents.peekFirst().toAppliedPTransform(getPipeline());
}
}
}
static class StepTranslator implements StepTranslationContext {
private final Translator translator;
private final Step step;
private StepTranslator(Translator translator, Step step) {
this.translator = translator;
this.step = step;
}
private Map<String, Object> getProperties() {
return DataflowPipelineTranslator.getProperties(step);
}
@Override
public void addEncodingInput(Coder<?> coder) {
CloudObject encoding = translateCoder(coder, translator);
addObject(getProperties(), PropertyNames.ENCODING, encoding);
}
@Override
public void addInput(String name, Boolean value) {
addBoolean(getProperties(), name, value);
}
@Override
public void addInput(String name, String value) {
addString(getProperties(), name, value);
}
@Override
public void addInput(String name, Long value) {
addLong(getProperties(), name, value);
}
@Override
public void addInput(String name, Map<String, Object> elements) {
addDictionary(getProperties(), name, elements);
}
@Override
public void addInput(String name, List<? extends Map<String, Object>> elements) {
addList(getProperties(), name, elements);
}
@Override
public void addInput(String name, PInput value) {
if (value instanceof PValue) {
PValue pvalue = (PValue) value;
addInput(name, translator.asOutputReference(pvalue, translator.getProducer(pvalue)));
} else {
throw new IllegalStateException("Input must be a PValue");
}
}
@Override
public void addOutput(String name, PCollection<?> value) {
translator.producers.put(value, translator.currentTransform);
// Wrap the PCollection element Coder inside a WindowedValueCoder.
Coder<?> coder =
WindowedValue.getFullCoder(
value.getCoder(), value.getWindowingStrategy().getWindowFn().windowCoder());
addOutput(name, value, coder);
}
@Override
public void addCollectionToSingletonOutput(
PCollection<?> inputValue, String outputName, PCollectionView<?> outputValue) {
translator.producers.put(outputValue, translator.currentTransform);
Coder<?> inputValueCoder = checkNotNull(translator.outputCoders.get(inputValue));
// The inputValueCoder for the input PCollection should be some
// WindowedValueCoder of the input PCollection's element
// coder.
checkState(inputValueCoder instanceof WindowedValue.WindowedValueCoder);
// The outputValueCoder for the output should be an
// IterableCoder of the inputValueCoder. This is a property
// of the backend "CollectionToSingleton" step.
Coder<?> outputValueCoder = IterableCoder.of(inputValueCoder);
addOutput(outputName, outputValue, outputValueCoder);
}
/**
* Adds an output with the given name to the previously added Dataflow step, producing the
* specified output {@code PValue} with the given {@code Coder} (if not {@code null}).
*/
private void addOutput(String name, PValue value, Coder<?> valueCoder) {
translator.registerOutputName(value, name);
Map<String, Object> properties = getProperties();
@Nullable List<Map<String, Object>> outputInfoList = null;
try {
// TODO: This should be done via a Structs accessor.
outputInfoList = (List<Map<String, Object>>) properties.get(PropertyNames.OUTPUT_INFO);
} catch (Exception e) {
throw new RuntimeException("Inconsistent dataflow pipeline translation", e);
}
if (outputInfoList == null) {
outputInfoList = new ArrayList<>();
// TODO: This should be done via a Structs accessor.
properties.put(PropertyNames.OUTPUT_INFO, outputInfoList);
}
Map<String, Object> outputInfo = new HashMap<>();
addString(outputInfo, PropertyNames.OUTPUT_NAME, name);
String stepName = getString(properties, PropertyNames.USER_NAME);
String generatedName = String.format("%s.out%d", stepName, outputInfoList.size());
addString(outputInfo, PropertyNames.USER_NAME, generatedName);
if ((value instanceof PCollection
&& translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value))
|| ((value instanceof PCollectionView)
&& (Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
((PCollectionView) value).getViewFn().getMaterialization().getUrn())))) {
addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true);
}
if (valueCoder != null) {
// Verify that encoding can be decoded, in order to catch serialization
// failures as early as possible.
CloudObject encoding = translateCoder(valueCoder, translator);
addObject(outputInfo, PropertyNames.ENCODING, encoding);
translator.outputCoders.put(value, valueCoder);
}
outputInfoList.add(outputInfo);
}
private void addDisplayData(Step step, String stepName, HasDisplayData hasDisplayData) {
DisplayData displayData = DisplayData.from(hasDisplayData);
List<Map<String, Object>> list = MAPPER.convertValue(displayData, List.class);
addList(getProperties(), PropertyNames.DISPLAY_DATA, list);
}
}
/////////////////////////////////////////////////////////////////////////////
@Override
public String toString() {
return "DataflowPipelineTranslator#" + hashCode();
}
private static Map<String, Object> getProperties(Step step) {
Map<String, Object> properties = step.getProperties();
if (properties == null) {
properties = new HashMap<>();
step.setProperties(properties);
}
return properties;
}
///////////////////////////////////////////////////////////////////////////
static {
registerTransformTranslator(
View.CreatePCollectionView.class,
new TransformTranslator<View.CreatePCollectionView>() {
@Override
public void translate(View.CreatePCollectionView transform, TranslationContext context) {
translateTyped(transform, context);
}
private <ElemT, ViewT> void translateTyped(
View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
StepTranslationContext stepContext =
context.addStep(transform, "CollectionToSingleton");
PCollection<ElemT> input = context.getInput(transform);
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
stepContext.addInput(
PropertyNames.WINDOWING_STRATEGY,
byteArrayToJsonString(
serializeWindowingStrategy(windowingStrategy, context.getPipelineOptions())));
stepContext.addInput(
PropertyNames.IS_MERGING_WINDOW_FN,
!windowingStrategy.getWindowFn().isNonMerging());
stepContext.addCollectionToSingletonOutput(
input, PropertyNames.OUTPUT, transform.getView());
}
});
registerTransformTranslator(
CreateDataflowView.class,
new TransformTranslator<CreateDataflowView>() {
@Override
public void translate(CreateDataflowView transform, TranslationContext context) {
translateTyped(transform, context);
}
private <ElemT, ViewT> void translateTyped(
CreateDataflowView<ElemT, ViewT> transform, TranslationContext context) {
StepTranslationContext stepContext =
context.addStep(transform, "CollectionToSingleton");
PCollection<ElemT> input = context.getInput(transform);
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
stepContext.addCollectionToSingletonOutput(
input, PropertyNames.OUTPUT, transform.getView());
}
});
DataflowPipelineTranslator.registerTransformTranslator(
DataflowRunner.CombineGroupedValues.class,
new TransformTranslator<CombineGroupedValues>() {
@Override
public void translate(CombineGroupedValues transform, TranslationContext context) {
translateHelper(transform, context);
}
private <K, InputT, OutputT> void translateHelper(
final CombineGroupedValues<K, InputT, OutputT> primitiveTransform,
TranslationContext context) {
Combine.GroupedValues<K, InputT, OutputT> originalTransform =
primitiveTransform.getOriginalCombine();
StepTranslationContext stepContext =
context.addStep(primitiveTransform, "CombineValues");
translateInputs(
stepContext,
context.getInput(primitiveTransform),
originalTransform.getSideInputs(),
context);
AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
originalTransform.getAppliedFn(
context.getInput(primitiveTransform).getPipeline().getCoderRegistry(),
context.getInput(primitiveTransform).getCoder(),
context.getInput(primitiveTransform).getWindowingStrategy());
stepContext.addEncodingInput(fn.getAccumulatorCoder());
List<String> experiments = context.getPipelineOptions().getExperiments();
boolean isFnApi = experiments != null && experiments.contains("beam_fn_api");
if (isFnApi) {
String ptransformId =
context.getSdkComponents().getPTransformIdOrThrow(context.getCurrentParent());
stepContext.addInput(PropertyNames.SERIALIZED_FN, ptransformId);
} else {
stepContext.addInput(
PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(fn)));
}
stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(primitiveTransform));
}
});
registerTransformTranslator(
Flatten.PCollections.class,
new TransformTranslator<Flatten.PCollections>() {
@Override
public void translate(Flatten.PCollections transform, TranslationContext context) {
flattenHelper(transform, context);
}
private <T> void flattenHelper(
Flatten.PCollections<T> transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "Flatten");
List<OutputReference> inputs = new ArrayList<>();
for (PValue input : context.getInputs(transform).values()) {
inputs.add(context.asOutputReference(input, context.getProducer(input)));
}
stepContext.addInput(PropertyNames.INPUTS, inputs);
stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
}
});
registerTransformTranslator(
GroupByKeyAndSortValuesOnly.class,
new TransformTranslator<GroupByKeyAndSortValuesOnly>() {
@Override
public void translate(GroupByKeyAndSortValuesOnly transform, TranslationContext context) {
groupByKeyAndSortValuesHelper(transform, context);
}
private <K1, K2, V> void groupByKeyAndSortValuesHelper(
GroupByKeyAndSortValuesOnly<K1, K2, V> transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "GroupByKey");
PCollection<KV<K1, KV<K2, V>>> input = context.getInput(transform);
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
stepContext.addInput(PropertyNames.SORT_VALUES, true);
// TODO: Add support for combiner lifting once the need arises.
stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, true);
}
});
registerTransformTranslator(
GroupByKey.class,
new TransformTranslator<GroupByKey>() {
@Override
public void translate(GroupByKey transform, TranslationContext context) {
groupByKeyHelper(transform, context);
}
private <K, V> void groupByKeyHelper(
GroupByKey<K, V> transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "GroupByKey");
PCollection<KV<K, V>> input = context.getInput(transform);
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
boolean isStreaming =
context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
boolean allowCombinerLifting =
windowingStrategy.getWindowFn().isNonMerging()
&& windowingStrategy.getWindowFn().assignsToOneWindow();
if (isStreaming) {
allowCombinerLifting &= transform.fewKeys();
// TODO: Allow combiner lifting on the non-default trigger, as appropriate.
allowCombinerLifting &= (windowingStrategy.getTrigger() instanceof DefaultTrigger);
}
stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, !allowCombinerLifting);
stepContext.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(
serializeWindowingStrategy(windowingStrategy, context.getPipelineOptions())));
stepContext.addInput(
PropertyNames.IS_MERGING_WINDOW_FN,
!windowingStrategy.getWindowFn().isNonMerging());
}
});
registerTransformTranslator(
ParDo.MultiOutput.class,
new TransformTranslator<ParDo.MultiOutput>() {
@Override
public void translate(ParDo.MultiOutput transform, TranslationContext context) {
translateMultiHelper(transform, context);
}
private <InputT, OutputT> void translateMultiHelper(
ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
DoFnSchemaInformation doFnSchemaInformation;
doFnSchemaInformation =
ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
Map<String, PCollectionView<?>> sideInputMapping =
ParDoTranslation.getSideInputMapping(context.getCurrentTransform());
Map<TupleTag<?>, Coder<?>> outputCoders =
context.getOutputs(transform).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
translateInputs(
stepContext,
context.getInput(transform),
transform.getSideInputs().values(),
context);
translateOutputs(context.getOutputs(transform), stepContext);
String ptransformId =
context.getSdkComponents().getPTransformIdOrThrow(context.getCurrentTransform());
translateFn(
stepContext,
ptransformId,
transform.getFn(),
context.getInput(transform).getWindowingStrategy(),
transform.getSideInputs().values(),
context.getInput(transform).getCoder(),
context,
transform.getMainOutputTag(),
outputCoders,
doFnSchemaInformation,
sideInputMapping);
// TODO: Move this logic into translateFn once the legacy ProcessKeyedElements is
// removed.
if (context.isFnApi()) {
DoFnSignature signature = DoFnSignatures.signatureForDoFn(transform.getFn());
if (signature.processElement().isSplittable()) {
Coder<?> restrictionCoder =
DoFnInvokers.invokerFor(transform.getFn())
.invokeGetRestrictionCoder(
context.getInput(transform).getPipeline().getCoderRegistry());
stepContext.addInput(
PropertyNames.RESTRICTION_ENCODING, translateCoder(restrictionCoder, context));
}
}
}
});
registerTransformTranslator(
ParDoSingle.class,
new TransformTranslator<ParDoSingle>() {
@Override
public void translate(ParDoSingle transform, TranslationContext context) {
translateSingleHelper(transform, context);
}
private <InputT, OutputT> void translateSingleHelper(
ParDoSingle<InputT, OutputT> transform, TranslationContext context) {
DoFnSchemaInformation doFnSchemaInformation;
doFnSchemaInformation =
ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
Map<String, PCollectionView<?>> sideInputMapping =
ParDoTranslation.getSideInputMapping(context.getCurrentTransform());
StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
Map<TupleTag<?>, Coder<?>> outputCoders =
context.getOutputs(transform).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
translateInputs(
stepContext,
context.getInput(transform),
transform.getSideInputs().values(),
context);
stepContext.addOutput(
transform.getMainOutputTag().getId(), context.getOutput(transform));
String ptransformId =
context.getSdkComponents().getPTransformIdOrThrow(context.getCurrentTransform());
translateFn(
stepContext,
ptransformId,
transform.getFn(),
context.getInput(transform).getWindowingStrategy(),
transform.getSideInputs().values(),
context.getInput(transform).getCoder(),
context,
transform.getMainOutputTag(),
outputCoders,
doFnSchemaInformation,
sideInputMapping);
// TODO: Move this logic into translateFn once the legacy ProcessKeyedElements is
// removed.
if (context.isFnApi()) {
DoFnSignature signature = DoFnSignatures.signatureForDoFn(transform.getFn());
if (signature.processElement().isSplittable()) {
Coder<?> restrictionCoder =
DoFnInvokers.invokerFor(transform.getFn())
.invokeGetRestrictionCoder(
context.getInput(transform).getPipeline().getCoderRegistry());
stepContext.addInput(
PropertyNames.RESTRICTION_ENCODING, translateCoder(restrictionCoder, context));
}
}
}
});
registerTransformTranslator(
Window.Assign.class,
new TransformTranslator<Window.Assign>() {
@Override
public void translate(Window.Assign transform, TranslationContext context) {
translateHelper(transform, context);
}
private <T> void translateHelper(Window.Assign<T> transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "Bucket");
PCollection<T> input = context.getInput(transform);
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
byte[] serializedBytes =
serializeWindowingStrategy(strategy, context.getPipelineOptions());
String serializedJson = byteArrayToJsonString(serializedBytes);
stepContext.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
}
});
///////////////////////////////////////////////////////////////////////////
// IO Translation.
registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
///////////////////////////////////////////////////////////////////////////
// Legacy Splittable DoFn translation.
registerTransformTranslator(
SplittableParDo.ProcessKeyedElements.class,
new TransformTranslator<SplittableParDo.ProcessKeyedElements>() {
@Override
public void translate(
SplittableParDo.ProcessKeyedElements transform, TranslationContext context) {
translateTyped(transform, context);
}
private <InputT, OutputT, RestrictionT> void translateTyped(
SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT> transform,
TranslationContext context) {
DoFnSchemaInformation doFnSchemaInformation;
doFnSchemaInformation =
ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
Map<String, PCollectionView<?>> sideInputMapping =
ParDoTranslation.getSideInputMapping(context.getCurrentTransform());
StepTranslationContext stepContext =
context.addStep(transform, "SplittableProcessKeyed");
Map<TupleTag<?>, Coder<?>> outputCoders =
context.getOutputs(transform).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
translateInputs(
stepContext, context.getInput(transform), transform.getSideInputs(), context);
translateOutputs(context.getOutputs(transform), stepContext);
String ptransformId =
context.getSdkComponents().getPTransformIdOrThrow(context.getCurrentTransform());
translateFn(
stepContext,
ptransformId,
transform.getFn(),
transform.getInputWindowingStrategy(),
transform.getSideInputs(),
transform.getElementCoder(),
context,
transform.getMainOutputTag(),
outputCoders,
doFnSchemaInformation,
sideInputMapping);
stepContext.addInput(
PropertyNames.RESTRICTION_CODER,
translateCoder(transform.getRestrictionCoder(), context));
}
});
}
private static void translateInputs(
StepTranslationContext stepContext,
PCollection<?> input,
Iterable<PCollectionView<?>> sideInputs,
TranslationContext context) {
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
translateSideInputs(stepContext, sideInputs, context);
}
// Used for ParDo
private static void translateSideInputs(
StepTranslationContext stepContext,
Iterable<PCollectionView<?>> sideInputs,
TranslationContext context) {
Map<String, Object> nonParInputs = new HashMap<>();
for (PCollectionView<?> view : sideInputs) {
nonParInputs.put(
view.getTagInternal().getId(),
context.asOutputReference(view, context.getProducer(view)));
}
stepContext.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs);
}
private static void translateFn(
StepTranslationContext stepContext,
String ptransformId,
DoFn fn,
WindowingStrategy windowingStrategy,
Iterable<PCollectionView<?>> sideInputs,
Coder inputCoder,
TranslationContext context,
TupleTag<?> mainOutput,
Map<TupleTag<?>, Coder<?>> outputCoders,
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping) {
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
if (signature.usesState() || signature.usesTimers()) {
DataflowRunner.verifyStateSupported(fn);
DataflowRunner.verifyStateSupportForWindowingStrategy(windowingStrategy);
}
stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName());
// Fn API does not need the additional metadata in the wrapper, and it is Java-only serializable
// hence not suitable for portable execution
if (context.isFnApi()) {
stepContext.addInput(PropertyNames.SERIALIZED_FN, ptransformId);
} else {
stepContext.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(
serializeToByteArray(
DoFnInfo.forFn(
fn,
windowingStrategy,
sideInputs,
inputCoder,
outputCoders,
mainOutput,
doFnSchemaInformation,
sideInputMapping))));
}
// Setting USES_KEYED_STATE will cause an ungrouped shuffle, which works
// in streaming but does not work in batch
if (context.getPipelineOptions().isStreaming()
&& (signature.usesState() || signature.usesTimers())) {
stepContext.addInput(PropertyNames.USES_KEYED_STATE, "true");
}
}
private static void translateOutputs(
Map<TupleTag<?>, PValue> outputs, StepTranslationContext stepContext) {
for (Map.Entry<TupleTag<?>, PValue> taggedOutput : outputs.entrySet()) {
TupleTag<?> tag = taggedOutput.getKey();
checkArgument(
taggedOutput.getValue() instanceof PCollection,
"Non %s returned from Multi-output %s",
PCollection.class.getSimpleName(),
stepContext);
stepContext.addOutput(tag.getId(), (PCollection<?>) taggedOutput.getValue());
}
}
private static CloudObject translateCoder(Coder<?> coder, TranslationContext context) {
return CloudObjects.asCloudObject(coder, context.isFnApi() ? context.getSdkComponents() : null);
}
}