blob: 237318817acd8e4beef31df6c8205dffe460ce89 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.Parameter;
import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoLike;
import org.apache.beam.runners.core.construction.ReadTranslation.BoundedReadPayloadTranslator;
import org.apache.beam.runners.core.construction.ReadTranslation.UnboundedReadPayloadTranslator;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.joda.time.Instant;
/**
* A utility transform that executes a <a
* href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} by expanding it into a
* network of simpler transforms:
*
* <ol>
* <li>Pair each element with an initial restriction
* <li>Split each restriction into sub-restrictions
* <li>Explode windows, since splitting within each window has to happen independently
* <li>Assign a unique key to each element/restriction pair
* <li>Process the keyed element/restriction pairs in a runner-specific way with the splittable
* {@link DoFn}'s {@link DoFn.ProcessElement} method.
* </ol>
*
* <p>This transform is intended as a helper for internal use by runners when implementing {@code
* ParDo.of(splittable DoFn)}, but not for direct use by pipeline writers.
*/
@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
public class SplittableParDo<InputT, OutputT, RestrictionT>
extends PTransform<PCollection<InputT>, PCollectionTuple> {
/**
* A {@link PTransformOverrideFactory} that overrides a <a
* href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with {@link SplittableParDo}.
*/
public static class OverrideFactory<InputT, OutputT>
implements PTransformOverrideFactory<
PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> {
@Override
public PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform(
AppliedPTransform<PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform), forAppliedParDo(transform));
}
@Override
public Map<PValue, ReplacementOutput> mapOutputs(
Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
return ReplacementOutputs.tagged(outputs, newOutput);
}
}
private final DoFn<InputT, OutputT> doFn;
private final List<PCollectionView<?>> sideInputs;
private final TupleTag<OutputT> mainOutputTag;
private final TupleTagList additionalOutputTags;
private final Map<TupleTag<?>, Coder<?>> outputTagsToCoders;
public static final String SPLITTABLE_PROCESS_URN =
"beam:runners_core:transforms:splittable_process:v1";
public static final String SPLITTABLE_GBKIKWI_URN =
"beam:runners_core:transforms:splittable_gbkikwi:v1";
private SplittableParDo(
DoFn<InputT, OutputT> doFn,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
TupleTagList additionalOutputTags,
Map<TupleTag<?>, Coder<?>> outputTagsToCoders) {
checkArgument(
DoFnSignatures.getSignature(doFn.getClass()).processElement().isSplittable(),
"fn must be a splittable DoFn");
this.doFn = doFn;
this.sideInputs = sideInputs;
this.mainOutputTag = mainOutputTag;
this.additionalOutputTags = additionalOutputTags;
this.outputTagsToCoders = outputTagsToCoders;
}
/**
* Creates the transform for a {@link ParDo}-compatible {@link AppliedPTransform}.
*
* <p>The input may generally be a deserialized transform so it may not actually be a {@link
* ParDo}. Instead {@link ParDoTranslation} will be used to extract fields.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forAppliedParDo(
AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> parDo) {
checkArgument(parDo != null, "parDo must not be null");
try {
Map<TupleTag<?>, Coder<?>> outputTagsToCoders = Maps.newHashMap();
for (Map.Entry<TupleTag<?>, PValue> entry : parDo.getOutputs().entrySet()) {
outputTagsToCoders.put(entry.getKey(), ((PCollection) entry.getValue()).getCoder());
}
return new SplittableParDo(
ParDoTranslation.getDoFn(parDo),
ParDoTranslation.getSideInputs(parDo),
ParDoTranslation.getMainOutputTag(parDo),
ParDoTranslation.getAdditionalOutputTags(parDo),
outputTagsToCoders);
} catch (IOException exc) {
throw new RuntimeException(exc);
}
}
@Override
public PCollectionTuple expand(PCollection<InputT> input) {
Coder<RestrictionT> restrictionCoder =
DoFnInvokers.invokerFor(doFn)
.invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
Coder<KV<InputT, RestrictionT>> splitCoder = KvCoder.of(input.getCoder(), restrictionCoder);
PCollection<KV<byte[], KV<InputT, RestrictionT>>> keyedRestrictions =
input
.apply(
"Pair with initial restriction",
ParDo.of(new PairWithRestrictionFn<InputT, OutputT, RestrictionT>(doFn)))
.setCoder(splitCoder)
.apply("Split restriction", ParDo.of(new SplitRestrictionFn<>(doFn)))
.setCoder(splitCoder)
// ProcessFn requires all input elements to be in a single window and have a single
// element per work item. This must precede the unique keying so each key has a single
// associated element.
.apply("Explode windows", ParDo.of(new ExplodeWindowsFn<>()))
.apply("Assign unique key", WithKeys.of(new RandomUniqueKeyFn<>()));
return keyedRestrictions.apply(
"ProcessKeyedElements",
new ProcessKeyedElements<>(
doFn,
input.getCoder(),
restrictionCoder,
(WindowingStrategy<InputT, ?>) input.getWindowingStrategy(),
sideInputs,
mainOutputTag,
additionalOutputTags,
outputTagsToCoders));
}
@Override
public Map<TupleTag<?>, PValue> getAdditionalInputs() {
return PCollectionViews.toAdditionalInputs(sideInputs);
}
/**
* A {@link DoFn} that forces each of its outputs to be in a single window, by indicating to the
* runner that it observes the window of its input element, so the runner is forced to apply it to
* each input in a single window and thus its output is also in a single window.
*/
private static class ExplodeWindowsFn<InputT> extends DoFn<InputT, InputT> {
@ProcessElement
public void process(ProcessContext c, BoundedWindow window) {
c.output(c.element());
}
}
/**
* Runner-specific primitive {@link PTransform} that invokes the {@link DoFn.ProcessElement}
* method for a splittable {@link DoFn} on each {@link KV} of the input {@link PCollection} of
* {@link KV KVs} keyed with arbitrary but globally unique keys.
*/
public static class ProcessKeyedElements<InputT, OutputT, RestrictionT>
extends PTransform<PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple> {
private final DoFn<InputT, OutputT> fn;
private final Coder<InputT> elementCoder;
private final Coder<RestrictionT> restrictionCoder;
private final WindowingStrategy<InputT, ?> windowingStrategy;
private final List<PCollectionView<?>> sideInputs;
private final TupleTag<OutputT> mainOutputTag;
private final TupleTagList additionalOutputTags;
private final Map<TupleTag<?>, Coder<?>> outputTagsToCoders;
/**
* @param fn the splittable {@link DoFn}.
* @param windowingStrategy the {@link WindowingStrategy} of the input collection.
* @param sideInputs list of side inputs that should be available to the {@link DoFn}.
* @param mainOutputTag {@link TupleTag Tag} of the {@link DoFn DoFn's} main output.
* @param additionalOutputTags {@link TupleTagList Tags} of the {@link DoFn DoFn's} additional
* @param outputTagsToCoders A map from output tag to the coder for that output, which should
* provide mappings for the main and all additional tags.
*/
public ProcessKeyedElements(
DoFn<InputT, OutputT> fn,
Coder<InputT> elementCoder,
Coder<RestrictionT> restrictionCoder,
WindowingStrategy<InputT, ?> windowingStrategy,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
TupleTagList additionalOutputTags,
Map<TupleTag<?>, Coder<?>> outputTagsToCoders) {
this.fn = fn;
this.elementCoder = elementCoder;
this.restrictionCoder = restrictionCoder;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
this.mainOutputTag = mainOutputTag;
this.additionalOutputTags = additionalOutputTags;
this.outputTagsToCoders = outputTagsToCoders;
}
public DoFn<InputT, OutputT> getFn() {
return fn;
}
public Coder<InputT> getElementCoder() {
return elementCoder;
}
public Coder<RestrictionT> getRestrictionCoder() {
return restrictionCoder;
}
public WindowingStrategy<InputT, ?> getInputWindowingStrategy() {
return windowingStrategy;
}
public List<PCollectionView<?>> getSideInputs() {
return sideInputs;
}
public TupleTag<OutputT> getMainOutputTag() {
return mainOutputTag;
}
public TupleTagList getAdditionalOutputTags() {
return additionalOutputTags;
}
public Map<TupleTag<?>, Coder<?>> getOutputTagsToCoders() {
return outputTagsToCoders;
}
@Override
public PCollectionTuple expand(PCollection<KV<byte[], KV<InputT, RestrictionT>>> input) {
return createPrimitiveOutputFor(
input, fn, mainOutputTag, additionalOutputTags, outputTagsToCoders, windowingStrategy);
}
public static <OutputT> PCollectionTuple createPrimitiveOutputFor(
PCollection<?> input,
DoFn<?, OutputT> fn,
TupleTag<OutputT> mainOutputTag,
TupleTagList additionalOutputTags,
Map<TupleTag<?>, Coder<?>> outputTagsToCoders,
WindowingStrategy<?, ?> windowingStrategy) {
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
PCollectionTuple outputs =
PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
outputTagsToCoders,
windowingStrategy,
input.isBounded().and(signature.isBoundedPerElement()));
// Set output type descriptor similarly to how ParDo.MultiOutput does it.
outputs.get(mainOutputTag).setTypeDescriptor(fn.getOutputTypeDescriptor());
return outputs;
}
@Override
public Map<TupleTag<?>, PValue> getAdditionalInputs() {
return PCollectionViews.toAdditionalInputs(sideInputs);
}
}
/** Registers {@link UnboundedReadPayloadTranslator} and {@link BoundedReadPayloadTranslator}. */
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class Registrar implements TransformPayloadTranslatorRegistrar {
@Override
public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
getTransformPayloadTranslators() {
return ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder()
.put(ProcessKeyedElements.class, new ProcessKeyedElementsTranslator())
.build();
}
}
/** A translator for {@link ProcessKeyedElements}. */
public static class ProcessKeyedElementsTranslator
implements PTransformTranslation.TransformPayloadTranslator<ProcessKeyedElements<?, ?, ?>> {
public static TransformPayloadTranslator create() {
return new ProcessKeyedElementsTranslator();
}
private ProcessKeyedElementsTranslator() {}
@Override
public String getUrn(ProcessKeyedElements<?, ?, ?> transform) {
return PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN;
}
@Override
public FunctionSpec translate(
AppliedPTransform<?, ?, ProcessKeyedElements<?, ?, ?>> transform, SdkComponents components)
throws IOException {
ProcessKeyedElements<?, ?, ?> pke = transform.getTransform();
final DoFn<?, ?> fn = pke.getFn();
final DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
final String restrictionCoderId = components.registerCoder(pke.getRestrictionCoder());
ParDoPayload payload =
ParDoTranslation.payloadForParDoLike(
new ParDoLike() {
@Override
public SdkFunctionSpec translateDoFn(SdkComponents newComponents) {
// Schemas not yet supported on splittable DoFn.
return ParDoTranslation.translateDoFn(
fn,
pke.getMainOutputTag(),
Collections.emptyMap(),
DoFnSchemaInformation.create(),
newComponents);
}
@Override
public List<Parameter> translateParameters() {
return ParDoTranslation.translateParameters(
signature.processElement().extraParameters());
}
@Override
public Map<String, SideInput> translateSideInputs(SdkComponents components) {
return ParDoTranslation.translateSideInputs(pke.getSideInputs(), components);
}
@Override
public Map<String, StateSpec> translateStateSpecs(SdkComponents components) {
// SDFs don't have state.
return ImmutableMap.of();
}
@Override
public Map<String, TimerSpec> translateTimerSpecs(SdkComponents components) {
// SDFs don't have timers.
return ImmutableMap.of();
}
@Override
public boolean isSplittable() {
return true;
}
@Override
public String translateRestrictionCoderId(SdkComponents newComponents) {
return restrictionCoderId;
}
},
components);
return RunnerApi.FunctionSpec.newBuilder()
.setUrn(getUrn(pke))
.setPayload(payload.toByteString())
.build();
}
}
/**
* Assigns a random unique key to each element of the input collection, so that the output
* collection is effectively the same elements as input, but the per-key state and timers are now
* effectively per-element.
*/
private static class RandomUniqueKeyFn<T> implements SerializableFunction<T, byte[]> {
@Override
public byte[] apply(T input) {
byte[] key = new byte[128];
ThreadLocalRandom.current().nextBytes(key);
return key;
}
}
/**
* Pairs each input element with its initial restriction using the given splittable {@link DoFn}.
*/
private static class PairWithRestrictionFn<InputT, OutputT, RestrictionT>
extends DoFn<InputT, KV<InputT, RestrictionT>> {
private DoFn<InputT, OutputT> fn;
// Initialized in setup()
private transient @Nullable DoFnInvoker<InputT, OutputT> invoker;
PairWithRestrictionFn(DoFn<InputT, OutputT> fn) {
this.fn = fn;
}
@Setup
public void setup() {
invoker = DoFnInvokers.invokerFor(fn);
invoker.invokeSetup();
}
@ProcessElement
public void processElement(ProcessContext context) {
context.output(
KV.of(context.element(), invoker.invokeGetInitialRestriction(context.element())));
}
@Teardown
public void tearDown() {
invoker.invokeTeardown();
invoker = null;
}
}
/** Splits the restriction using the given {@link SplitRestriction} method. */
private static class SplitRestrictionFn<InputT, RestrictionT>
extends DoFn<KV<InputT, RestrictionT>, KV<InputT, RestrictionT>> {
private final DoFn<InputT, ?> splittableFn;
// Initialized in setup()
private transient @Nullable DoFnInvoker<InputT, ?> invoker;
SplitRestrictionFn(DoFn<InputT, ?> splittableFn) {
this.splittableFn = splittableFn;
}
@Setup
public void setup() {
invoker = DoFnInvokers.invokerFor(splittableFn);
invoker.invokeSetup();
}
@ProcessElement
public void processElement(final ProcessContext c) {
final InputT element = c.element().getKey();
invoker.invokeSplitRestriction(
element,
c.element().getValue(),
new OutputReceiver<RestrictionT>() {
@Override
public void output(RestrictionT part) {
c.output(KV.of(element, part));
}
@Override
public void outputWithTimestamp(RestrictionT part, Instant timestamp) {
throw new UnsupportedOperationException();
}
});
}
@Teardown
public void tearDown() {
invoker.invokeTeardown();
invoker = null;
}
}
}