blob: 8c73964dfca720f4d34947bf463983c8e0b8173c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.CombineComponents;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.SplittableParDoComponents;
import org.apache.beam.runners.core.construction.ExternalTranslation.ExternalTranslator;
import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.common.ReflectHelpers.ObjectsClassComparator;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSortedSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
/**
* Utilities for converting {@link PTransform PTransforms} to {@link RunnerApi Runner API protocol
* buffers}.
*/
public class PTransformTranslation {
public static final String PAR_DO_TRANSFORM_URN = getUrn(StandardPTransforms.Primitives.PAR_DO);
public static final String FLATTEN_TRANSFORM_URN = getUrn(StandardPTransforms.Primitives.FLATTEN);
public static final String GROUP_BY_KEY_TRANSFORM_URN =
getUrn(StandardPTransforms.Primitives.GROUP_BY_KEY);
public static final String IMPULSE_TRANSFORM_URN = getUrn(StandardPTransforms.Primitives.IMPULSE);
public static final String ASSIGN_WINDOWS_TRANSFORM_URN =
getUrn(StandardPTransforms.Primitives.ASSIGN_WINDOWS);
public static final String TEST_STREAM_TRANSFORM_URN =
getUrn(StandardPTransforms.Primitives.TEST_STREAM);
public static final String MAP_WINDOWS_TRANSFORM_URN =
getUrn(StandardPTransforms.Primitives.MAP_WINDOWS);
/**
* @deprecated SDKs should move away from creating `Read` transforms and migrate to using Impulse
* + SplittableDoFns.
*/
@Deprecated
public static final String READ_TRANSFORM_URN =
getUrn(StandardPTransforms.DeprecatedPrimitives.READ);
/**
* @deprecated runners should move away from translating `CreatePCollectionView` and treat this as
* part of the translation for a `ParDo` side input.
*/
@Deprecated
public static final String CREATE_VIEW_TRANSFORM_URN =
getUrn(StandardPTransforms.DeprecatedPrimitives.CREATE_VIEW);
public static final String COMBINE_PER_KEY_TRANSFORM_URN =
getUrn(StandardPTransforms.Composites.COMBINE_PER_KEY);
public static final String COMBINE_GLOBALLY_TRANSFORM_URN =
getUrn(StandardPTransforms.Composites.COMBINE_GLOBALLY);
public static final String COMBINE_GROUPED_VALUES_TRANSFORM_URN =
getUrn(CombineComponents.COMBINE_GROUPED_VALUES);
public static final String COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN =
getUrn(CombineComponents.COMBINE_PER_KEY_PRECOMBINE);
public static final String COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN =
getUrn(CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS);
public static final String COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN =
getUrn(CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS);
public static final String RESHUFFLE_URN = getUrn(StandardPTransforms.Composites.RESHUFFLE);
public static final String WRITE_FILES_TRANSFORM_URN =
getUrn(StandardPTransforms.Composites.WRITE_FILES);
// SplittableParDoComponents
public static final String SPLITTABLE_PAIR_WITH_RESTRICTION_URN =
getUrn(SplittableParDoComponents.PAIR_WITH_RESTRICTION);
public static final String SPLITTABLE_SPLIT_RESTRICTION_URN =
getUrn(SplittableParDoComponents.SPLIT_RESTRICTION);
public static final String SPLITTABLE_PROCESS_KEYED_URN =
getUrn(SplittableParDoComponents.PROCESS_KEYED_ELEMENTS);
public static final String SPLITTABLE_PROCESS_ELEMENTS_URN =
getUrn(SplittableParDoComponents.PROCESS_ELEMENTS);
public static final String SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN =
getUrn(SplittableParDoComponents.SPLIT_AND_SIZE_RESTRICTIONS);
public static final String SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN =
getUrn(SplittableParDoComponents.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS);
public static final String ITERABLE_SIDE_INPUT =
getUrn(RunnerApi.StandardSideInputTypes.Enum.ITERABLE);
public static final String MULTIMAP_SIDE_INPUT =
getUrn(RunnerApi.StandardSideInputTypes.Enum.MULTIMAP);
private static final Collection<TransformTranslator<?>> KNOWN_TRANSLATORS =
loadKnownTranslators();
private static Collection<TransformTranslator<?>> loadKnownTranslators() {
return ImmutableSortedSet.<TransformTranslator<?>>orderedBy(
(Comparator) ObjectsClassComparator.INSTANCE)
.add(new RawPTransformTranslator())
.add(new KnownTransformPayloadTranslator())
.add(ParDoTranslator.create())
.add(ExternalTranslator.create())
.build();
}
private PTransformTranslation() {}
/**
* Translates an {@link AppliedPTransform} into a runner API proto.
*
* <p>Does not register the {@code appliedPTransform} within the provided {@link SdkComponents}.
*/
static RunnerApi.PTransform toProto(
AppliedPTransform<?, ?, ?> appliedPTransform,
List<AppliedPTransform<?, ?, ?>> subtransforms,
SdkComponents components)
throws IOException {
TransformTranslator<?> transformTranslator =
Iterables.find(
KNOWN_TRANSLATORS,
translator -> translator.canTranslate(appliedPTransform.getTransform()),
DefaultUnknownTransformTranslator.INSTANCE);
return transformTranslator.translate(appliedPTransform, subtransforms, components);
}
/**
* Translates a composite {@link AppliedPTransform} into a runner API proto with no component
* transforms.
*
* <p>This should not be used when translating a {@link Pipeline}.
*
* <p>Does not register the {@code appliedPTransform} within the provided {@link SdkComponents}.
*/
static RunnerApi.PTransform toProto(
AppliedPTransform<?, ?, ?> appliedPTransform, SdkComponents components) throws IOException {
return toProto(appliedPTransform, Collections.emptyList(), components);
}
private static String toProto(TupleTag<?> tag) {
return tag.getId();
}
/** Returns the URN for the transform if it is known, otherwise {@code null}. */
@Nullable
public static String urnForTransformOrNull(PTransform<?, ?> transform) {
TransformTranslator<?> transformTranslator =
Iterables.find(
KNOWN_TRANSLATORS,
translator -> translator.canTranslate(transform),
DefaultUnknownTransformTranslator.INSTANCE);
return ((TransformTranslator) transformTranslator).getUrn(transform);
}
/** Returns the URN for the transform if it is known, otherwise throws. */
public static String urnForTransform(PTransform<?, ?> transform) {
String urn = urnForTransformOrNull(transform);
if (urn == null) {
throw new IllegalStateException(
String.format("No translator known for %s", transform.getClass().getName()));
}
return urn;
}
/** Returns the URN for the transform if it is known, otherwise {@code null}. */
@Nullable
public static String urnForTransformOrNull(RunnerApi.PTransform transform) {
return transform.getSpec() == null ? null : transform.getSpec().getUrn();
}
/**
* A translator between a Java-based {@link PTransform} and a protobuf for that transform.
*
* <p>When going to a protocol buffer message, the translator produces a payload corresponding to
* the Java representation while registering components that transform references.
*/
public interface TransformTranslator<T extends PTransform<?, ?>> {
@Nullable
String getUrn(T transform);
boolean canTranslate(PTransform<?, ?> pTransform);
RunnerApi.PTransform translate(
AppliedPTransform<?, ?, ?> appliedPTransform,
List<AppliedPTransform<?, ?, ?>> subtransforms,
SdkComponents components)
throws IOException;
}
/** Translates all unknown transforms to have an empty {@link FunctionSpec} and unset URN. */
private static class DefaultUnknownTransformTranslator
implements TransformTranslator<PTransform<?, ?>> {
private static final TransformTranslator<?> INSTANCE = new DefaultUnknownTransformTranslator();
@Override
public String getUrn(PTransform<?, ?> transform) {
return null;
}
@Override
public boolean canTranslate(PTransform<?, ?> pTransform) {
return true;
}
@Override
public RunnerApi.PTransform translate(
AppliedPTransform<?, ?, ?> appliedPTransform,
List<AppliedPTransform<?, ?, ?>> subtransforms,
SdkComponents components)
throws IOException {
return translateAppliedPTransform(appliedPTransform, subtransforms, components).build();
}
}
/**
* Translates {@link RawPTransform} by extracting the {@link FunctionSpec} and migrating over all
* referenced components.
*/
private static class RawPTransformTranslator implements TransformTranslator<RawPTransform<?, ?>> {
@Override
public String getUrn(RawPTransform transform) {
return transform.getUrn();
}
@Override
public boolean canTranslate(PTransform<?, ?> pTransform) {
return pTransform instanceof RawPTransform;
}
@Override
public RunnerApi.PTransform translate(
AppliedPTransform<?, ?, ?> appliedPTransform,
List<AppliedPTransform<?, ?, ?>> subtransforms,
SdkComponents components)
throws IOException {
RunnerApi.PTransform.Builder transformBuilder =
translateAppliedPTransform(appliedPTransform, subtransforms, components);
PTransform<?, ?> transform = appliedPTransform.getTransform();
// The raw transform was parsed in the context of other components; this puts it in the
// context of our current serialization
FunctionSpec spec = ((RawPTransform<?, ?>) transform).migrate(components);
// A composite transform is permitted to have a null spec. There are also some pseudo-
// primitives not yet supported by the portability framework that have null specs
if (spec != null) {
transformBuilder.setSpec(spec);
}
return transformBuilder.build();
}
}
/**
* Translates a set of registered transforms whose content only differs based by differences in
* their {@link FunctionSpec}s and URNs.
*/
private static class KnownTransformPayloadTranslator<T extends PTransform<?, ?>>
implements TransformTranslator<T> {
private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
loadTransformPayloadTranslators() {
HashMap<Class<? extends PTransform>, TransformPayloadTranslator> translators =
new HashMap<>();
for (TransformPayloadTranslatorRegistrar registrar :
ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
Map<Class<? extends PTransform>, TransformPayloadTranslator> newTranslators =
(Map) registrar.getTransformPayloadTranslators();
Set<Class<? extends PTransform>> alreadyRegistered =
Sets.intersection(translators.keySet(), newTranslators.keySet());
if (!alreadyRegistered.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"Classes already registered: %s", Joiner.on(", ").join(alreadyRegistered)));
}
translators.putAll(newTranslators);
}
return ImmutableMap.copyOf(translators);
}
@Override
public boolean canTranslate(PTransform pTransform) {
return KNOWN_PAYLOAD_TRANSLATORS.containsKey(pTransform.getClass());
}
@Override
public String getUrn(PTransform transform) {
return KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()).getUrn(transform);
}
@Override
public RunnerApi.PTransform translate(
AppliedPTransform<?, ?, ?> appliedPTransform,
List<AppliedPTransform<?, ?, ?>> subtransforms,
SdkComponents components)
throws IOException {
RunnerApi.PTransform.Builder transformBuilder =
translateAppliedPTransform(appliedPTransform, subtransforms, components);
FunctionSpec spec =
KNOWN_PAYLOAD_TRANSLATORS
.get(appliedPTransform.getTransform().getClass())
.translate(appliedPTransform, components);
if (spec != null) {
transformBuilder.setSpec(spec);
}
return transformBuilder.build();
}
}
/**
* Translates an {@link AppliedPTransform} by:
*
* <ul>
* <li>adding an input to the PTransform for each {@link AppliedPTransform#getInputs()}.
* <li>adding an output to the PTransform for each {@link AppliedPTransform#getOutputs()}.
* <li>adding a PCollection for each {@link AppliedPTransform#getOutputs()}.
* <li>adding a reference to each subtransform.
* <li>set the unique name.
* <li>set the display data.
* </ul>
*/
static RunnerApi.PTransform.Builder translateAppliedPTransform(
AppliedPTransform<?, ?, ?> appliedPTransform,
List<AppliedPTransform<?, ?, ?>> subtransforms,
SdkComponents components)
throws IOException {
RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder();
for (Map.Entry<TupleTag<?>, PValue> taggedInput : appliedPTransform.getInputs().entrySet()) {
checkArgument(
taggedInput.getValue() instanceof PCollection,
"Unexpected input type %s",
taggedInput.getValue().getClass());
transformBuilder.putInputs(
toProto(taggedInput.getKey()),
components.registerPCollection((PCollection<?>) taggedInput.getValue()));
}
for (Map.Entry<TupleTag<?>, PValue> taggedOutput : appliedPTransform.getOutputs().entrySet()) {
// TODO: Remove gating
if (taggedOutput.getValue() instanceof PCollection) {
checkArgument(
taggedOutput.getValue() instanceof PCollection,
"Unexpected output type %s",
taggedOutput.getValue().getClass());
transformBuilder.putOutputs(
toProto(taggedOutput.getKey()),
components.registerPCollection((PCollection<?>) taggedOutput.getValue()));
}
}
for (AppliedPTransform<?, ?, ?> subtransform : subtransforms) {
transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform));
}
transformBuilder.setUniqueName(appliedPTransform.getFullName());
transformBuilder.setDisplayData(
DisplayDataTranslation.toProto(DisplayData.from(appliedPTransform.getTransform())));
return transformBuilder;
}
/**
* A translator between a Java-based {@link PTransform} and a protobuf payload for that transform.
*
* <p>When going to a protocol buffer message, the translator produces a payload corresponding to
* the Java representation while registering components that payload references.
*/
public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
String getUrn(T transform);
@Nullable
FunctionSpec translate(AppliedPTransform<?, ?, T> application, SdkComponents components)
throws IOException;
/**
* A {@link TransformPayloadTranslator} for transforms that contain no references to components,
* so they do not need a specialized rehydration.
*/
abstract class NotSerializable<T extends PTransform<?, ?>>
implements TransformPayloadTranslator<T> {
public static NotSerializable<?> forUrn(final String urn) {
return new NotSerializable<PTransform<?, ?>>() {
@Override
public String getUrn(PTransform<?, ?> transform) {
return urn;
}
};
}
@Override
public final FunctionSpec translate(
AppliedPTransform<?, ?, T> transform, SdkComponents components) throws IOException {
throw new UnsupportedOperationException(
String.format(
"%s should never be translated",
transform.getTransform().getClass().getCanonicalName()));
}
}
}
/**
* A {@link PTransform} that indicates its URN and payload directly.
*
* <p>This is the result of rehydrating transforms from a pipeline proto. There is no {@link
* #expand} method since the definition of the transform may be lost. The transform is already
* fully expanded in the pipeline proto.
*/
public abstract static class RawPTransform<InputT extends PInput, OutputT extends POutput>
extends PTransform<InputT, OutputT> {
/** The URN for this transform, if standardized. */
@Nullable
public String getUrn() {
return getSpec() == null ? null : getSpec().getUrn();
}
/** The payload for this transform, if any. */
@Nullable
public abstract FunctionSpec getSpec();
/**
* Build a new payload set in the context of the given {@link SdkComponents}, if applicable.
*
* <p>When re-serializing this transform, the ids reference in the rehydrated payload may
* conflict with those defined by the serialization context. In that case, the components must
* be re-registered and a new payload returned.
*/
public FunctionSpec migrate(SdkComponents components) throws IOException {
return getSpec();
}
/**
* By default, throws an exception, but can be overridden.
*
* <p>It is permissible for runner-specific transforms to be both a {@link RawPTransform} that
* directly vends its proto representation and also to expand, for convenience of not having to
* register a translator.
*/
@Override
public OutputT expand(InputT input) {
throw new IllegalStateException(
String.format(
"%s should never be asked to expand;"
+ " it is the result of deserializing an already-constructed Pipeline",
getClass().getSimpleName()));
}
}
}