blob: 785bf2b2a2c6d08d2e48c67cdaf341f8bcdb278b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink;
import static java.lang.String.format;
import static org.apache.beam.runners.core.construction.SplittableParDo.SPLITTABLE_PROCESS_URN;
import com.google.auto.service.AutoService;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestStreamSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* This class contains all the mappings between Beam and Flink <b>streaming</b> transformations. The
* {@link FlinkStreamingPipelineTranslator} traverses the Beam job and comes here to translate the
* encountered Beam transformations into Flink one, based on the mapping available in this class.
*/
class FlinkStreamingTransformTranslators {
// --------------------------------------------------------------------------------------------
// Transform Translator Registry
// --------------------------------------------------------------------------------------------
/** A map from a Transform URN to the translator. */
@SuppressWarnings("rawtypes")
private static final Map<String, FlinkStreamingPipelineTranslator.StreamTransformTranslator>
TRANSLATORS = new HashMap<>();
// here you can find all the available translators.
static {
TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslator());
TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoStreamingTranslator());
TRANSLATORS.put(SPLITTABLE_PROCESS_URN, new SplittableProcessElementsStreamingTranslator());
TRANSLATORS.put(SplittableParDo.SPLITTABLE_GBKIKWI_URN, new GBKIntoKeyedWorkItemsTranslator());
TRANSLATORS.put(
PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslator());
TRANSLATORS.put(
PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionTranslator());
TRANSLATORS.put(
CreateStreamingFlinkView.CREATE_STREAMING_FLINK_VIEW_URN,
new CreateViewStreamingTranslator());
TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorStreaming());
TRANSLATORS.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslator());
TRANSLATORS.put(
PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new CombinePerKeyTranslator());
TRANSLATORS.put(PTransformTranslation.TEST_STREAM_TRANSFORM_URN, new TestStreamTranslator());
}
public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(
PTransform<?, ?> transform) {
@Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform);
return urn == null ? null : TRANSLATORS.get(urn);
}
@SuppressWarnings("unchecked")
private static String getCurrentTransformName(FlinkStreamingTranslationContext context) {
return context.getCurrentTransform().getFullName();
}
// --------------------------------------------------------------------------------------------
// Transformation Implementations
// --------------------------------------------------------------------------------------------
private static class UnboundedReadSourceTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
PTransform<PBegin, PCollection<T>>> {
@Override
public void translateNode(
PTransform<PBegin, PCollection<T>> transform, FlinkStreamingTranslationContext context) {
PCollection<T> output = context.getOutput(transform);
DataStream<WindowedValue<T>> source;
DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
TypeInformation<WindowedValue<T>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));
Coder<T> coder = context.getOutput(transform).getCoder();
TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
new CoderTypeInformation<>(
WindowedValue.getFullCoder(
ValueWithRecordId.ValueWithRecordIdCoder.of(coder),
output.getWindowingStrategy().getWindowFn().windowCoder()));
UnboundedSource<T, ?> rawSource;
try {
rawSource =
ReadTranslation.unboundedSourceFromTransform(
(AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
context.getCurrentTransform());
} catch (IOException e) {
throw new RuntimeException(e);
}
String fullName = getCurrentTransformName(context);
try {
int parallelism =
context.getExecutionEnvironment().getMaxParallelism() > 0
? context.getExecutionEnvironment().getMaxParallelism()
: context.getExecutionEnvironment().getParallelism();
UnboundedSourceWrapper<T, ?> sourceWrapper =
new UnboundedSourceWrapper<>(
fullName, context.getPipelineOptions(), rawSource, parallelism);
nonDedupSource =
context
.getExecutionEnvironment()
.addSource(sourceWrapper)
.name(fullName)
.uid(fullName)
.returns(withIdTypeInfo);
if (rawSource.requiresDeduping()) {
source =
nonDedupSource
.keyBy(new ValueWithRecordIdKeySelector<>())
.transform(
"deduping",
outputTypeInfo,
new DedupingOperator<>(context.getPipelineOptions()))
.uid(format("%s/__deduplicated__", fullName));
} else {
source =
nonDedupSource
.flatMap(new StripIdsMap<>(context.getPipelineOptions()))
.returns(outputTypeInfo);
}
} catch (Exception e) {
throw new RuntimeException("Error while translating UnboundedSource: " + rawSource, e);
}
context.setOutputDataStream(output, source);
}
}
static class ValueWithRecordIdKeySelector<T>
implements KeySelector<WindowedValue<ValueWithRecordId<T>>, ByteBuffer>,
ResultTypeQueryable<ByteBuffer> {
@Override
public ByteBuffer getKey(WindowedValue<ValueWithRecordId<T>> value) throws Exception {
return ByteBuffer.wrap(value.getValue().getId());
}
@Override
public TypeInformation<ByteBuffer> getProducedType() {
return new GenericTypeInfo<>(ByteBuffer.class);
}
}
public static class StripIdsMap<T>
extends RichFlatMapFunction<WindowedValue<ValueWithRecordId<T>>, WindowedValue<T>> {
private final SerializablePipelineOptions options;
StripIdsMap(PipelineOptions options) {
this.options = new SerializablePipelineOptions(options);
}
@Override
public void open(Configuration parameters) {
// Initialize FileSystems for any coders which may want to use the FileSystem,
// see https://issues.apache.org/jira/browse/BEAM-8303
FileSystems.setDefaultPipelineOptions(options.get());
}
@Override
public void flatMap(
WindowedValue<ValueWithRecordId<T>> value, Collector<WindowedValue<T>> collector)
throws Exception {
collector.collect(value.withValue(value.getValue().getValue()));
}
}
private static class ReadSourceTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
PTransform<PBegin, PCollection<T>>> {
private final BoundedReadSourceTranslator<T> boundedTranslator =
new BoundedReadSourceTranslator<>();
private final UnboundedReadSourceTranslator<T> unboundedTranslator =
new UnboundedReadSourceTranslator<>();
@Override
void translateNode(
PTransform<PBegin, PCollection<T>> transform, FlinkStreamingTranslationContext context) {
if (context.getOutput(transform).isBounded().equals(PCollection.IsBounded.BOUNDED)) {
boundedTranslator.translateNode(transform, context);
} else {
unboundedTranslator.translateNode(transform, context);
}
}
}
private static class BoundedReadSourceTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
PTransform<PBegin, PCollection<T>>> {
@Override
public void translateNode(
PTransform<PBegin, PCollection<T>> transform, FlinkStreamingTranslationContext context) {
PCollection<T> output = context.getOutput(transform);
TypeInformation<WindowedValue<T>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));
BoundedSource<T> rawSource;
try {
rawSource =
ReadTranslation.boundedSourceFromTransform(
(AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
context.getCurrentTransform());
} catch (IOException e) {
throw new RuntimeException(e);
}
String fullName = getCurrentTransformName(context);
UnboundedSource<T, ?> adaptedRawSource = new BoundedToUnboundedSourceAdapter<>(rawSource);
DataStream<WindowedValue<T>> source;
try {
int parallelism =
context.getExecutionEnvironment().getMaxParallelism() > 0
? context.getExecutionEnvironment().getMaxParallelism()
: context.getExecutionEnvironment().getParallelism();
UnboundedSourceWrapperNoValueWithRecordId<T, ?> sourceWrapper =
new UnboundedSourceWrapperNoValueWithRecordId<>(
new UnboundedSourceWrapper<>(
fullName, context.getPipelineOptions(), adaptedRawSource, parallelism));
source =
context
.getExecutionEnvironment()
.addSource(sourceWrapper)
.name(fullName)
.uid(fullName)
.returns(outputTypeInfo);
} catch (Exception e) {
throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e);
}
context.setOutputDataStream(output, source);
}
}
/** Wraps each element in a {@link RawUnionValue} with the given tag id. */
public static class ToRawUnion<T> extends RichMapFunction<T, RawUnionValue> {
private final int intTag;
private final SerializablePipelineOptions options;
ToRawUnion(int intTag, PipelineOptions pipelineOptions) {
this.intTag = intTag;
this.options = new SerializablePipelineOptions(pipelineOptions);
}
@Override
public void open(Configuration parameters) {
// Initialize FileSystems for any coders which may want to use the FileSystem,
// see https://issues.apache.org/jira/browse/BEAM-8303
FileSystems.setDefaultPipelineOptions(options.get());
}
@Override
public RawUnionValue map(T o) throws Exception {
return new RawUnionValue(intTag, o);
}
}
private static Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>>
transformSideInputs(
Collection<PCollectionView<?>> sideInputs, FlinkStreamingTranslationContext context) {
// collect all side inputs
Map<TupleTag<?>, Integer> tagToIntMapping = new HashMap<>();
Map<Integer, PCollectionView<?>> intToViewMapping = new HashMap<>();
int count = 0;
for (PCollectionView<?> sideInput : sideInputs) {
TupleTag<?> tag = sideInput.getTagInternal();
intToViewMapping.put(count, sideInput);
tagToIntMapping.put(tag, count);
count++;
}
List<Coder<?>> inputCoders = new ArrayList<>();
for (PCollectionView<?> sideInput : sideInputs) {
DataStream<Object> sideInputStream = context.getInputDataStream(sideInput);
TypeInformation<Object> tpe = sideInputStream.getType();
if (!(tpe instanceof CoderTypeInformation)) {
throw new IllegalStateException("Input Stream TypeInformation is no CoderTypeInformation.");
}
Coder<?> coder = ((CoderTypeInformation) tpe).getCoder();
inputCoders.add(coder);
}
UnionCoder unionCoder = UnionCoder.of(inputCoders);
CoderTypeInformation<RawUnionValue> unionTypeInformation =
new CoderTypeInformation<>(unionCoder);
// transform each side input to RawUnionValue and union them
DataStream<RawUnionValue> sideInputUnion = null;
for (PCollectionView<?> sideInput : sideInputs) {
TupleTag<?> tag = sideInput.getTagInternal();
final int intTag = tagToIntMapping.get(tag);
DataStream<Object> sideInputStream = context.getInputDataStream(sideInput);
DataStream<RawUnionValue> unionValueStream =
sideInputStream
.map(new ToRawUnion<>(intTag, context.getPipelineOptions()))
.returns(unionTypeInformation);
if (sideInputUnion == null) {
sideInputUnion = unionValueStream;
} else {
sideInputUnion = sideInputUnion.union(unionValueStream);
}
}
if (sideInputUnion == null) {
throw new IllegalStateException("No unioned side inputs, this indicates a bug.");
}
return new Tuple2<>(intToViewMapping, sideInputUnion);
}
/**
* Helper for translating {@code ParDo.MultiOutput} and {@link
* SplittableParDoViaKeyedWorkItems.ProcessElements}.
*/
static class ParDoTranslationHelper {
interface DoFnOperatorFactory<InputT, OutputT> {
DoFnOperator<InputT, OutputT> createDoFnOperator(
DoFn<InputT, OutputT> doFn,
String stepName,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
FlinkStreamingTranslationContext context,
WindowingStrategy<?, ?> windowingStrategy,
Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders,
Map<TupleTag<?>, Integer> tagsToIds,
Coder<WindowedValue<InputT>> windowedInputCoder,
Coder<InputT> inputCoder,
Map<TupleTag<?>, Coder<?>> outputCoders,
Coder keyCoder,
KeySelector<WindowedValue<InputT>, ?> keySelector,
Map<Integer, PCollectionView<?>> transformedSideInputs,
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping);
}
static <InputT, OutputT> void translateParDo(
String transformName,
DoFn<InputT, OutputT> doFn,
PCollection<InputT> input,
List<PCollectionView<?>> sideInputs,
Map<TupleTag<?>, PValue> outputs,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping,
FlinkStreamingTranslationContext context,
DoFnOperatorFactory<InputT, OutputT> doFnOperatorFactory) {
// we assume that the transformation does not change the windowing strategy.
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags = Maps.newHashMap();
Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders = Maps.newHashMap();
// We associate output tags with ids, the Integer is easier to serialize than TupleTag.
// The return map of AppliedPTransform.getOutputs() is an ImmutableMap, its implementation is
// RegularImmutableMap, its entrySet order is the same with the order of insertion.
// So we can use the original AppliedPTransform.getOutputs() to produce deterministic ids.
Map<TupleTag<?>, Integer> tagsToIds = Maps.newHashMap();
int idCount = 0;
tagsToIds.put(mainOutputTag, idCount++);
for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
if (!tagsToOutputTags.containsKey(entry.getKey())) {
tagsToOutputTags.put(
entry.getKey(),
new OutputTag<WindowedValue<?>>(
entry.getKey().getId(),
(TypeInformation) context.getTypeInfo((PCollection<?>) entry.getValue())));
tagsToCoders.put(
entry.getKey(),
(Coder) context.getWindowedInputCoder((PCollection<OutputT>) entry.getValue()));
tagsToIds.put(entry.getKey(), idCount++);
}
}
SingleOutputStreamOperator<WindowedValue<OutputT>> outputStream;
Coder<WindowedValue<InputT>> windowedInputCoder = context.getWindowedInputCoder(input);
Coder<InputT> inputCoder = context.getInputCoder(input);
Map<TupleTag<?>, Coder<?>> outputCoders = context.getOutputCoders();
DataStream<WindowedValue<InputT>> inputDataStream = context.getInputDataStream(input);
Coder keyCoder = null;
KeySelector<WindowedValue<InputT>, ?> keySelector = null;
boolean stateful = false;
DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
if (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) {
// Based on the fact that the signature is stateful, DoFnSignatures ensures
// that it is also keyed
keyCoder = ((KvCoder) input.getCoder()).getKeyCoder();
keySelector = new KvToByteBufferKeySelector(keyCoder);
inputDataStream = inputDataStream.keyBy(keySelector);
stateful = true;
} else if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) {
// we know that it is keyed on byte[]
keyCoder = ByteArrayCoder.of();
keySelector = new WorkItemKeySelector<>(keyCoder);
stateful = true;
}
CoderTypeInformation<WindowedValue<OutputT>> outputTypeInformation =
new CoderTypeInformation<>(
context.getWindowedInputCoder((PCollection<OutputT>) outputs.get(mainOutputTag)));
if (sideInputs.isEmpty()) {
DoFnOperator<InputT, OutputT> doFnOperator =
doFnOperatorFactory.createDoFnOperator(
doFn,
getCurrentTransformName(context),
sideInputs,
mainOutputTag,
additionalOutputTags,
context,
windowingStrategy,
tagsToOutputTags,
tagsToCoders,
tagsToIds,
windowedInputCoder,
inputCoder,
outputCoders,
keyCoder,
keySelector,
new HashMap<>() /* side-input mapping */,
doFnSchemaInformation,
sideInputMapping);
outputStream =
inputDataStream.transform(transformName, outputTypeInformation, doFnOperator);
} else {
Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
transformSideInputs(sideInputs, context);
DoFnOperator<InputT, OutputT> doFnOperator =
doFnOperatorFactory.createDoFnOperator(
doFn,
getCurrentTransformName(context),
sideInputs,
mainOutputTag,
additionalOutputTags,
context,
windowingStrategy,
tagsToOutputTags,
tagsToCoders,
tagsToIds,
windowedInputCoder,
inputCoder,
outputCoders,
keyCoder,
keySelector,
transformedSideInputs.f0,
doFnSchemaInformation,
sideInputMapping);
if (stateful) {
// we have to manually construct the two-input transform because we're not
// allowed to have only one input keyed, normally.
KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream;
TwoInputTransformation<
WindowedValue<KV<?, InputT>>, RawUnionValue, WindowedValue<OutputT>>
rawFlinkTransform =
new TwoInputTransformation(
keyedStream.getTransformation(),
transformedSideInputs.f1.broadcast().getTransformation(),
transformName,
doFnOperator,
outputTypeInformation,
keyedStream.getParallelism());
rawFlinkTransform.setStateKeyType(keyedStream.getKeyType());
rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null);
outputStream =
new SingleOutputStreamOperator(
keyedStream.getExecutionEnvironment(),
rawFlinkTransform) {}; // we have to cheat around the ctor being protected
keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
} else {
outputStream =
inputDataStream
.connect(transformedSideInputs.f1.broadcast())
.transform(transformName, outputTypeInformation, doFnOperator);
}
}
outputStream.uid(transformName);
context.setOutputDataStream(outputs.get(mainOutputTag), outputStream);
for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
if (!entry.getKey().equals(mainOutputTag)) {
context.setOutputDataStream(
entry.getValue(), outputStream.getSideOutput(tagsToOutputTags.get(entry.getKey())));
}
}
}
}
private static class ParDoStreamingTranslator<InputT, OutputT>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
PTransform<PCollection<InputT>, PCollectionTuple>> {
@Override
public void translateNode(
PTransform<PCollection<InputT>, PCollectionTuple> transform,
FlinkStreamingTranslationContext context) {
DoFn<InputT, OutputT> doFn;
try {
doFn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(context.getCurrentTransform());
} catch (IOException e) {
throw new RuntimeException(e);
}
TupleTag<OutputT> mainOutputTag;
try {
mainOutputTag =
(TupleTag<OutputT>) ParDoTranslation.getMainOutputTag(context.getCurrentTransform());
} catch (IOException e) {
throw new RuntimeException(e);
}
List<PCollectionView<?>> sideInputs;
try {
sideInputs = ParDoTranslation.getSideInputs(context.getCurrentTransform());
} catch (IOException e) {
throw new RuntimeException(e);
}
Map<String, PCollectionView<?>> sideInputMapping =
ParDoTranslation.getSideInputMapping(context.getCurrentTransform());
TupleTagList additionalOutputTags;
try {
additionalOutputTags =
ParDoTranslation.getAdditionalOutputTags(context.getCurrentTransform());
} catch (IOException e) {
throw new RuntimeException(e);
}
DoFnSchemaInformation doFnSchemaInformation;
doFnSchemaInformation = ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
ParDoTranslationHelper.translateParDo(
getCurrentTransformName(context),
doFn,
context.getInput(transform),
sideInputs,
context.getOutputs(transform),
mainOutputTag,
additionalOutputTags.getAll(),
doFnSchemaInformation,
sideInputMapping,
context,
(doFn1,
stepName,
sideInputs1,
mainOutputTag1,
additionalOutputTags1,
context1,
windowingStrategy,
tagsToOutputTags,
tagsToCoders,
tagsToIds,
windowedInputCoder,
inputCoder,
outputCoders1,
keyCoder,
keySelector,
transformedSideInputs,
doFnSchemaInformation1,
sideInputMapping1) ->
new DoFnOperator<>(
doFn1,
stepName,
windowedInputCoder,
inputCoder,
outputCoders1,
mainOutputTag1,
additionalOutputTags1,
new DoFnOperator.MultiOutputOutputManagerFactory<>(
mainOutputTag1, tagsToOutputTags, tagsToCoders, tagsToIds),
windowingStrategy,
transformedSideInputs,
sideInputs1,
context1.getPipelineOptions(),
keyCoder,
keySelector,
doFnSchemaInformation1,
sideInputMapping1));
}
}
private static class SplittableProcessElementsStreamingTranslator<
InputT, OutputT, RestrictionT, PositionT>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
SplittableParDoViaKeyedWorkItems.ProcessElements<
InputT, OutputT, RestrictionT, PositionT>> {
@Override
public void translateNode(
SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT, RestrictionT, PositionT>
transform,
FlinkStreamingTranslationContext context) {
ParDoTranslationHelper.translateParDo(
getCurrentTransformName(context),
transform.newProcessFn(transform.getFn()),
context.getInput(transform),
transform.getSideInputs(),
context.getOutputs(transform),
transform.getMainOutputTag(),
transform.getAdditionalOutputTags().getAll(),
DoFnSchemaInformation.create(),
Collections.emptyMap(),
context,
(doFn,
stepName,
sideInputs,
mainOutputTag,
additionalOutputTags,
context1,
windowingStrategy,
tagsToOutputTags,
tagsToCoders,
tagsToIds,
windowedInputCoder,
inputCoder,
outputCoders1,
keyCoder,
keySelector,
transformedSideInputs,
doFnSchemaInformation,
sideInputMapping) ->
new SplittableDoFnOperator<>(
doFn,
stepName,
windowedInputCoder,
inputCoder,
outputCoders1,
mainOutputTag,
additionalOutputTags,
new DoFnOperator.MultiOutputOutputManagerFactory<>(
mainOutputTag, tagsToOutputTags, tagsToCoders, tagsToIds),
windowingStrategy,
transformedSideInputs,
sideInputs,
context1.getPipelineOptions(),
keyCoder,
keySelector));
}
}
private static class CreateViewStreamingTranslator<ElemT, ViewT>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
CreateStreamingFlinkView.CreateFlinkPCollectionView<ElemT, ViewT>> {
@Override
public void translateNode(
CreateStreamingFlinkView.CreateFlinkPCollectionView<ElemT, ViewT> transform,
FlinkStreamingTranslationContext context) {
// just forward
DataStream<WindowedValue<List<ElemT>>> inputDataSet =
context.getInputDataStream(context.getInput(transform));
PCollectionView<ViewT> view = transform.getView();
context.setOutputDataStream(view, inputDataSet);
}
}
private static class WindowAssignTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
PTransform<PCollection<T>, PCollection<T>>> {
@Override
public void translateNode(
PTransform<PCollection<T>, PCollection<T>> transform,
FlinkStreamingTranslationContext context) {
@SuppressWarnings("unchecked")
WindowingStrategy<T, BoundedWindow> windowingStrategy =
(WindowingStrategy<T, BoundedWindow>) context.getOutput(transform).getWindowingStrategy();
TypeInformation<WindowedValue<T>> typeInfo =
context.getTypeInfo(context.getOutput(transform));
DataStream<WindowedValue<T>> inputDataStream =
context.getInputDataStream(context.getInput(transform));
WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
new FlinkAssignWindows<>(windowFn);
String fullName = context.getOutput(transform).getName();
SingleOutputStreamOperator<WindowedValue<T>> outputDataStream =
inputDataStream
.flatMap(assignWindowsFunction)
.name(fullName)
.uid(fullName)
.returns(typeInfo);
context.setOutputDataStream(context.getOutput(transform), outputDataStream);
}
}
private static class ReshuffleTranslatorStreaming<K, InputT>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, InputT>>>> {
@Override
public void translateNode(
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, InputT>>> transform,
FlinkStreamingTranslationContext context) {
DataStream<WindowedValue<KV<K, InputT>>> inputDataSet =
context.getInputDataStream(context.getInput(transform));
context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance());
}
}
private static class GroupByKeyTranslator<K, InputT>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> {
@Override
public void translateNode(
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform,
FlinkStreamingTranslationContext context) {
PCollection<KV<K, InputT>> input = context.getInput(transform);
@SuppressWarnings("unchecked")
WindowingStrategy<?, BoundedWindow> windowingStrategy =
(WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
SingletonKeyedWorkItemCoder<K, InputT> workItemCoder =
SingletonKeyedWorkItemCoder.of(
inputKvCoder.getKeyCoder(),
inputKvCoder.getValueCoder(),
input.getWindowingStrategy().getWindowFn().windowCoder());
DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
WindowedValue.FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>>
windowedWorkItemCoder =
WindowedValue.getFullCoder(
workItemCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
new CoderTypeInformation<>(windowedWorkItemCoder);
DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
inputDataStream
.flatMap(new ToKeyedWorkItem<>(context.getPipelineOptions()))
.returns(workItemTypeInfo)
.name("ToKeyedWorkItem");
WorkItemKeySelector keySelector = new WorkItemKeySelector<>(inputKvCoder.getKeyCoder());
KeyedStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>, ByteBuffer>
keyedWorkItemStream =
workItemStream.keyBy(new WorkItemKeySelector<>(inputKvCoder.getKeyCoder()));
SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, BoundedWindow> reduceFn =
SystemReduceFn.buffering(inputKvCoder.getValueCoder());
Coder<WindowedValue<KV<K, Iterable<InputT>>>> outputCoder =
context.getWindowedInputCoder(context.getOutput(transform));
TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));
TupleTag<KV<K, Iterable<InputT>>> mainTag = new TupleTag<>("main output");
String fullName = getCurrentTransformName(context);
WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator =
new WindowDoFnOperator<>(
reduceFn,
fullName,
(Coder) windowedWorkItemCoder,
mainTag,
Collections.emptyList(),
new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, outputCoder),
windowingStrategy,
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
context.getPipelineOptions(),
inputKvCoder.getKeyCoder(),
keySelector);
// our operator expects WindowedValue<KeyedWorkItem> while our input stream
// is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
@SuppressWarnings("unchecked")
SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> outDataStream =
keyedWorkItemStream
.transform(fullName, outputTypeInfo, (OneInputStreamOperator) doFnOperator)
.uid(fullName);
context.setOutputDataStream(context.getOutput(transform), outDataStream);
}
}
private static class CombinePerKeyTranslator<K, InputT, OutputT>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
@Override
boolean canTranslate(
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
FlinkStreamingTranslationContext context) {
// if we have a merging window strategy and side inputs we cannot
// translate as a proper combine. We have to group and then run the combine
// over the final grouped values.
PCollection<KV<K, InputT>> input = context.getInput(transform);
@SuppressWarnings("unchecked")
WindowingStrategy<?, BoundedWindow> windowingStrategy =
(WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
return windowingStrategy.getWindowFn().isNonMerging()
|| ((Combine.PerKey) transform).getSideInputs().isEmpty();
}
@Override
public void translateNode(
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
FlinkStreamingTranslationContext context) {
String fullName = getCurrentTransformName(context);
PCollection<KV<K, InputT>> input = context.getInput(transform);
@SuppressWarnings("unchecked")
WindowingStrategy<?, BoundedWindow> windowingStrategy =
(WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
SingletonKeyedWorkItemCoder<K, InputT> workItemCoder =
SingletonKeyedWorkItemCoder.of(
inputKvCoder.getKeyCoder(),
inputKvCoder.getValueCoder(),
input.getWindowingStrategy().getWindowFn().windowCoder());
DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
WindowedValue.FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>>
windowedWorkItemCoder =
WindowedValue.getFullCoder(
workItemCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
new CoderTypeInformation<>(windowedWorkItemCoder);
DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
inputDataStream
.flatMap(new ToKeyedWorkItem<>(context.getPipelineOptions()))
.returns(workItemTypeInfo)
.name("ToKeyedWorkItem");
WorkItemKeySelector keySelector = new WorkItemKeySelector<>(inputKvCoder.getKeyCoder());
KeyedStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>, ByteBuffer>
keyedWorkItemStream = workItemStream.keyBy(keySelector);
GlobalCombineFn<? super InputT, ?, OutputT> combineFn = ((Combine.PerKey) transform).getFn();
SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn =
SystemReduceFn.combining(
inputKvCoder.getKeyCoder(),
AppliedCombineFn.withInputCoder(
combineFn, input.getPipeline().getCoderRegistry(), inputKvCoder));
Coder<WindowedValue<KV<K, OutputT>>> outputCoder =
context.getWindowedInputCoder(context.getOutput(transform));
TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));
List<PCollectionView<?>> sideInputs = ((Combine.PerKey) transform).getSideInputs();
if (sideInputs.isEmpty()) {
TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
new WindowDoFnOperator<>(
reduceFn,
fullName,
(Coder) windowedWorkItemCoder,
mainTag,
Collections.emptyList(),
new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, outputCoder),
windowingStrategy,
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
context.getPipelineOptions(),
inputKvCoder.getKeyCoder(),
keySelector);
// our operator expects WindowedValue<KeyedWorkItem> while our input stream
// is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
@SuppressWarnings("unchecked")
SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
keyedWorkItemStream
.transform(fullName, outputTypeInfo, (OneInputStreamOperator) doFnOperator)
.uid(fullName);
context.setOutputDataStream(context.getOutput(transform), outDataStream);
} else {
Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs =
transformSideInputs(sideInputs, context);
TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
new WindowDoFnOperator<>(
reduceFn,
fullName,
(Coder) windowedWorkItemCoder,
mainTag,
Collections.emptyList(),
new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, outputCoder),
windowingStrategy,
transformSideInputs.f0,
sideInputs,
context.getPipelineOptions(),
inputKvCoder.getKeyCoder(),
keySelector);
// we have to manually contruct the two-input transform because we're not
// allowed to have only one input keyed, normally.
TwoInputTransformation<
WindowedValue<SingletonKeyedWorkItem<K, InputT>>,
RawUnionValue,
WindowedValue<KV<K, OutputT>>>
rawFlinkTransform =
new TwoInputTransformation<>(
keyedWorkItemStream.getTransformation(),
transformSideInputs.f1.broadcast().getTransformation(),
transform.getName(),
(TwoInputStreamOperator) doFnOperator,
outputTypeInfo,
keyedWorkItemStream.getParallelism());
rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType());
rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), null);
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
new SingleOutputStreamOperator(
keyedWorkItemStream.getExecutionEnvironment(),
rawFlinkTransform) {}; // we have to cheat around the ctor being protected
keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
context.setOutputDataStream(context.getOutput(transform), outDataStream);
}
}
}
private static class GBKIntoKeyedWorkItemsTranslator<K, InputT>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
PTransform<PCollection<KV<K, InputT>>, PCollection<KeyedWorkItem<K, InputT>>>> {
@Override
boolean canTranslate(
PTransform<PCollection<KV<K, InputT>>, PCollection<KeyedWorkItem<K, InputT>>> transform,
FlinkStreamingTranslationContext context) {
return true;
}
@Override
public void translateNode(
PTransform<PCollection<KV<K, InputT>>, PCollection<KeyedWorkItem<K, InputT>>> transform,
FlinkStreamingTranslationContext context) {
PCollection<KV<K, InputT>> input = context.getInput(transform);
KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
SingletonKeyedWorkItemCoder<K, InputT> workItemCoder =
SingletonKeyedWorkItemCoder.of(
inputKvCoder.getKeyCoder(),
inputKvCoder.getValueCoder(),
input.getWindowingStrategy().getWindowFn().windowCoder());
WindowedValue.ValueOnlyWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>>
windowedWorkItemCoder = WindowedValue.getValueOnlyCoder(workItemCoder);
CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
new CoderTypeInformation<>(windowedWorkItemCoder);
DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
inputDataStream
.flatMap(new ToKeyedWorkItemInGlobalWindow<>(context.getPipelineOptions()))
.returns(workItemTypeInfo)
.name("ToKeyedWorkItem");
KeyedStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>, ByteBuffer>
keyedWorkItemStream =
workItemStream.keyBy(new WorkItemKeySelector<>(inputKvCoder.getKeyCoder()));
context.setOutputDataStream(context.getOutput(transform), keyedWorkItemStream);
}
}
private static class ToKeyedWorkItemInGlobalWindow<K, InputT>
extends RichFlatMapFunction<
WindowedValue<KV<K, InputT>>, WindowedValue<SingletonKeyedWorkItem<K, InputT>>> {
private final SerializablePipelineOptions options;
ToKeyedWorkItemInGlobalWindow(PipelineOptions options) {
this.options = new SerializablePipelineOptions(options);
}
@Override
public void open(Configuration parameters) {
// Initialize FileSystems for any coders which may want to use the FileSystem,
// see https://issues.apache.org/jira/browse/BEAM-8303
FileSystems.setDefaultPipelineOptions(options.get());
}
@Override
public void flatMap(
WindowedValue<KV<K, InputT>> inWithMultipleWindows,
Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out)
throws Exception {
// we need to wrap each one work item per window for now
// since otherwise the PushbackSideInputRunner will not correctly
// determine whether side inputs are ready
//
// this is tracked as https://issues.apache.org/jira/browse/BEAM-1850
for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows()) {
SingletonKeyedWorkItem<K, InputT> workItem =
new SingletonKeyedWorkItem<>(
in.getValue().getKey(), in.withValue(in.getValue().getValue()));
out.collect(WindowedValue.valueInGlobalWindow(workItem));
}
}
}
private static class FlattenPCollectionTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
PTransform<PCollection<T>, PCollection<T>>> {
@Override
public void translateNode(
PTransform<PCollection<T>, PCollection<T>> transform,
FlinkStreamingTranslationContext context) {
Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform);
if (allInputs.isEmpty()) {
// create an empty dummy source to satisfy downstream operations
// we cannot create an empty source in Flink, therefore we have to
// add the flatMap that simply never forwards the single element
DataStreamSource<String> dummySource =
context.getExecutionEnvironment().fromElements("dummy");
DataStream<WindowedValue<T>> result =
dummySource
.<WindowedValue<T>>flatMap(
(s, collector) -> {
// never return anything
})
.returns(
new CoderTypeInformation<>(
WindowedValue.getFullCoder(
(Coder<T>) VoidCoder.of(), GlobalWindow.Coder.INSTANCE)));
context.setOutputDataStream(context.getOutput(transform), result);
} else {
DataStream<T> result = null;
// Determine DataStreams that we use as input several times. For those, we need to uniquify
// input streams because Flink seems to swallow watermarks when we have a union of one and
// the same stream.
Map<DataStream<T>, Integer> duplicates = new HashMap<>();
for (PValue input : allInputs.values()) {
DataStream<T> current = context.getInputDataStream(input);
Integer oldValue = duplicates.put(current, 1);
if (oldValue != null) {
duplicates.put(current, oldValue + 1);
}
}
for (PValue input : allInputs.values()) {
DataStream<T> current = context.getInputDataStream(input);
final Integer timesRequired = duplicates.get(current);
if (timesRequired > 1) {
current =
current.flatMap(
new FlatMapFunction<T, T>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(T t, Collector<T> collector) throws Exception {
collector.collect(t);
}
});
}
result = (result == null) ? current : result.union(current);
}
context.setOutputDataStream(context.getOutput(transform), result);
}
}
}
static class ToKeyedWorkItem<K, InputT>
extends RichFlatMapFunction<
WindowedValue<KV<K, InputT>>, WindowedValue<SingletonKeyedWorkItem<K, InputT>>> {
private final SerializablePipelineOptions options;
ToKeyedWorkItem(PipelineOptions options) {
this.options = new SerializablePipelineOptions(options);
}
@Override
public void open(Configuration parameters) {
// Initialize FileSystems for any coders which may want to use the FileSystem,
// see https://issues.apache.org/jira/browse/BEAM-8303
FileSystems.setDefaultPipelineOptions(options.get());
}
@Override
public void flatMap(
WindowedValue<KV<K, InputT>> inWithMultipleWindows,
Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out)
throws Exception {
// we need to wrap each one work item per window for now
// since otherwise the PushbackSideInputRunner will not correctly
// determine whether side inputs are ready
//
// this is tracked as https://issues.apache.org/jira/browse/BEAM-1850
for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows()) {
SingletonKeyedWorkItem<K, InputT> workItem =
new SingletonKeyedWorkItem<>(
in.getValue().getKey(), in.withValue(in.getValue().getValue()));
out.collect(in.withValue(workItem));
}
}
}
/**
* A translator just to vend the URN. This will need to be moved to runners-core-construction-java
* once SDF is reorganized appropriately.
*/
private static class SplittableParDoProcessElementsTranslator
extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>> {
private SplittableParDoProcessElementsTranslator() {}
@Override
public String getUrn(SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?> transform) {
return SPLITTABLE_PROCESS_URN;
}
}
/** Registers classes specialized to the Flink runner. */
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class FlinkTransformsRegistrar implements TransformPayloadTranslatorRegistrar {
@Override
public Map<
? extends Class<? extends PTransform>,
? extends PTransformTranslation.TransformPayloadTranslator>
getTransformPayloadTranslators() {
return ImmutableMap
.<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder()
.put(
CreateStreamingFlinkView.CreateFlinkPCollectionView.class,
new CreateStreamingFlinkViewPayloadTranslator())
.put(
SplittableParDoViaKeyedWorkItems.ProcessElements.class,
new SplittableParDoProcessElementsTranslator())
.put(
SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class,
new SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator())
.build();
}
}
/**
* A translator just to vend the URN. This will need to be moved to runners-core-construction-java
* once SDF is reorganized appropriately.
*/
private static class SplittableParDoProcessElementsPayloadTranslator
extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>> {
private SplittableParDoProcessElementsPayloadTranslator() {}
@Override
public String getUrn(SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?> transform) {
return SPLITTABLE_PROCESS_URN;
}
}
/**
* A translator just to vend the URN. This will need to be moved to runners-core-construction-java
* once SDF is reorganized appropriately.
*/
private static class SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator
extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?>> {
private SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator() {}
@Override
public String getUrn(SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?> transform) {
return SplittableParDo.SPLITTABLE_GBKIKWI_URN;
}
}
/** A translator just to vend the URN. */
private static class CreateStreamingFlinkViewPayloadTranslator
extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?>> {
private CreateStreamingFlinkViewPayloadTranslator() {}
@Override
public String getUrn(CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?> transform) {
return CreateStreamingFlinkView.CREATE_STREAMING_FLINK_VIEW_URN;
}
}
/** A translator to support {@link TestStream} with Flink. */
private static class TestStreamTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TestStream<T>> {
@Override
void translateNode(TestStream<T> testStream, FlinkStreamingTranslationContext context) {
Coder<T> valueCoder = testStream.getValueCoder();
// Coder for the Elements in the TestStream
TestStream.TestStreamCoder<T> testStreamCoder = TestStream.TestStreamCoder.of(valueCoder);
final byte[] payload;
try {
payload = CoderUtils.encodeToByteArray(testStreamCoder, testStream);
} catch (CoderException e) {
throw new RuntimeException("Could not encode TestStream.", e);
}
SerializableFunction<byte[], TestStream<T>> testStreamDecoder =
bytes -> {
try {
return CoderUtils.decodeFromByteArray(
TestStream.TestStreamCoder.of(valueCoder), bytes);
} catch (CoderException e) {
throw new RuntimeException("Can't decode TestStream payload.", e);
}
};
WindowedValue.FullWindowedValueCoder<T> elementCoder =
WindowedValue.getFullCoder(valueCoder, GlobalWindow.Coder.INSTANCE);
DataStreamSource<WindowedValue<T>> source =
context
.getExecutionEnvironment()
.addSource(
new TestStreamSource<>(testStreamDecoder, payload),
new CoderTypeInformation<>(elementCoder));
context.setOutputDataStream(context.getOutput(testStream), source);
}
}
/**
* Wrapper for {@link UnboundedSourceWrapper}, which simplifies output type, namely, removes
* {@link ValueWithRecordId}.
*/
static class UnboundedSourceWrapperNoValueWithRecordId<
OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
extends RichParallelSourceFunction<WindowedValue<OutputT>>
implements ProcessingTimeCallback,
StoppableFunction,
CheckpointListener,
CheckpointedFunction {
private final UnboundedSourceWrapper<OutputT, CheckpointMarkT> unboundedSourceWrapper;
@VisibleForTesting
UnboundedSourceWrapper<OutputT, CheckpointMarkT> getUnderlyingSource() {
return unboundedSourceWrapper;
}
UnboundedSourceWrapperNoValueWithRecordId(
UnboundedSourceWrapper<OutputT, CheckpointMarkT> unboundedSourceWrapper) {
this.unboundedSourceWrapper = unboundedSourceWrapper;
}
@Override
public void open(Configuration parameters) throws Exception {
unboundedSourceWrapper.setRuntimeContext(getRuntimeContext());
unboundedSourceWrapper.open(parameters);
}
@Override
public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
unboundedSourceWrapper.run(new SourceContextWrapper(ctx));
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
unboundedSourceWrapper.initializeState(context);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
unboundedSourceWrapper.snapshotState(context);
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
unboundedSourceWrapper.notifyCheckpointComplete(checkpointId);
}
@Override
public void stop() {
unboundedSourceWrapper.stop();
}
@Override
public void cancel() {
unboundedSourceWrapper.cancel();
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
unboundedSourceWrapper.onProcessingTime(timestamp);
}
private final class SourceContextWrapper
implements SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> {
private final SourceContext<WindowedValue<OutputT>> ctx;
private SourceContextWrapper(SourceContext<WindowedValue<OutputT>> ctx) {
this.ctx = ctx;
}
@Override
public void collect(WindowedValue<ValueWithRecordId<OutputT>> element) {
OutputT originalValue = element.getValue().getValue();
WindowedValue<OutputT> output =
WindowedValue.of(
originalValue, element.getTimestamp(), element.getWindows(), element.getPane());
ctx.collect(output);
}
@Override
public void collectWithTimestamp(
WindowedValue<ValueWithRecordId<OutputT>> element, long timestamp) {
OutputT originalValue = element.getValue().getValue();
WindowedValue<OutputT> output =
WindowedValue.of(
originalValue, element.getTimestamp(), element.getWindows(), element.getPane());
ctx.collectWithTimestamp(output, timestamp);
}
@Override
public void emitWatermark(Watermark mark) {
ctx.emitWatermark(mark);
}
@Override
public void markAsTemporarilyIdle() {
ctx.markAsTemporarilyIdle();
}
@Override
public Object getCheckpointLock() {
return ctx.getCheckpointLock();
}
@Override
public void close() {
ctx.close();
}
}
}
}