blob: a3a5a1f36a8660b060583ed0e55973bd71fb78da [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
/**
* Utilities for converting {@link PTransform PTransforms} to and from {@link RunnerApi Runner API
* protocol buffers}.
*/
public class PTransformTranslation {
public static final String PAR_DO_TRANSFORM_URN = "urn:beam:transform:pardo:v1";
public static final String FLATTEN_TRANSFORM_URN = "urn:beam:transform:flatten:v1";
public static final String GROUP_BY_KEY_TRANSFORM_URN = "urn:beam:transform:groupbykey:v1";
public static final String READ_TRANSFORM_URN = "urn:beam:transform:read:v1";
public static final String WINDOW_TRANSFORM_URN = "urn:beam:transform:window:v1";
public static final String TEST_STREAM_TRANSFORM_URN = "urn:beam:transform:teststream:v1";
// Not strictly a primitive transform
public static final String COMBINE_TRANSFORM_URN = "urn:beam:transform:combine:v1";
public static final String RESHUFFLE_URN = "urn:beam:transform:reshuffle:v1";
// Less well-known. And where shall these live?
public static final String WRITE_FILES_TRANSFORM_URN = "urn:beam:transform:write_files:0.1";
/**
* @deprecated runners should move away from translating `CreatePCollectionView` and treat this as
* part of the translation for a `ParDo` side input.
*/
@Deprecated
public static final String CREATE_VIEW_TRANSFORM_URN = "urn:beam:transform:create_view:v1";
private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
private static final Map<String, TransformPayloadTranslator> KNOWN_REHYDRATORS =
loadTransformRehydrators();
private static final TransformPayloadTranslator<?> DEFAULT_REHYDRATOR =
new RawPTransformTranslator();
private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
loadTransformPayloadTranslators() {
HashMap<Class<? extends PTransform>, TransformPayloadTranslator> translators = new HashMap<>();
for (TransformPayloadTranslatorRegistrar registrar :
ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
Map<Class<? extends PTransform>, TransformPayloadTranslator> newTranslators =
(Map) registrar.getTransformPayloadTranslators();
Set<Class<? extends PTransform>> alreadyRegistered =
Sets.intersection(translators.keySet(), newTranslators.keySet());
if (!alreadyRegistered.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"Classes already registered: %s", Joiner.on(", ").join(alreadyRegistered)));
}
translators.putAll(newTranslators);
}
return ImmutableMap.copyOf(translators);
}
private static Map<String, TransformPayloadTranslator> loadTransformRehydrators() {
HashMap<String, TransformPayloadTranslator> rehydrators = new HashMap<>();
for (TransformPayloadTranslatorRegistrar registrar :
ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
Map<String, ? extends TransformPayloadTranslator> newRehydrators =
registrar.getTransformRehydrators();
Set<String> alreadyRegistered =
Sets.intersection(rehydrators.keySet(), newRehydrators.keySet());
if (!alreadyRegistered.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"URNs already registered: %s", Joiner.on(", ").join(alreadyRegistered)));
}
rehydrators.putAll(newRehydrators);
}
return ImmutableMap.copyOf(rehydrators);
}
private PTransformTranslation() {}
/**
* Translates an {@link AppliedPTransform} into a runner API proto.
*
* <p>Does not register the {@code appliedPTransform} within the provided {@link SdkComponents}.
*/
static RunnerApi.PTransform toProto(
AppliedPTransform<?, ?, ?> appliedPTransform,
List<AppliedPTransform<?, ?, ?>> subtransforms,
SdkComponents components)
throws IOException {
// TODO include DisplayData https://issues.apache.org/jira/browse/BEAM-2645
RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder();
for (Map.Entry<TupleTag<?>, PValue> taggedInput : appliedPTransform.getInputs().entrySet()) {
checkArgument(
taggedInput.getValue() instanceof PCollection,
"Unexpected input type %s",
taggedInput.getValue().getClass());
transformBuilder.putInputs(
toProto(taggedInput.getKey()),
components.registerPCollection((PCollection<?>) taggedInput.getValue()));
}
for (Map.Entry<TupleTag<?>, PValue> taggedOutput : appliedPTransform.getOutputs().entrySet()) {
// TODO: Remove gating
if (taggedOutput.getValue() instanceof PCollection) {
checkArgument(
taggedOutput.getValue() instanceof PCollection,
"Unexpected output type %s",
taggedOutput.getValue().getClass());
transformBuilder.putOutputs(
toProto(taggedOutput.getKey()),
components.registerPCollection((PCollection<?>) taggedOutput.getValue()));
}
}
for (AppliedPTransform<?, ?, ?> subtransform : subtransforms) {
transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform));
}
transformBuilder.setUniqueName(appliedPTransform.getFullName());
transformBuilder.setDisplayData(
DisplayDataTranslation.toProto(DisplayData.from(appliedPTransform.getTransform())));
PTransform<?, ?> transform = appliedPTransform.getTransform();
// A RawPTransform directly vends its payload. Because it will generally be
// a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS.
if (transform instanceof RawPTransform) {
// The raw transform was parsed in the context of other components; this puts it in the
// context of our current serialization
FunctionSpec spec = ((RawPTransform<?, ?>) transform).migrate(components);
// A composite transform is permitted to have a null spec. There are also some pseudo-
// primitives not yet supported by the portability framework that have null specs
if (spec != null) {
transformBuilder.setSpec(spec);
}
} else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
transformBuilder.setSpec(
KNOWN_PAYLOAD_TRANSLATORS
.get(transform.getClass())
.translate(appliedPTransform, components));
}
return transformBuilder.build();
}
/**
* Translates a {@link RunnerApi.PTransform} to a {@link RawPTransform} specialized for the URN
* and spec.
*/
static RawPTransform<?, ?> rehydrate(
RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
throws IOException {
@Nullable
TransformPayloadTranslator<?> rehydrator =
KNOWN_REHYDRATORS.get(
protoTransform.getSpec() == null ? null : protoTransform.getSpec().getUrn());
if (rehydrator == null) {
return DEFAULT_REHYDRATOR.rehydrate(protoTransform, rehydratedComponents);
} else {
return rehydrator.rehydrate(protoTransform, rehydratedComponents);
}
}
/**
* Translates a composite {@link AppliedPTransform} into a runner API proto with no component
* transforms.
*
* <p>This should not be used when translating a {@link Pipeline}.
*
* <p>Does not register the {@code appliedPTransform} within the provided {@link SdkComponents}.
*/
static RunnerApi.PTransform toProto(
AppliedPTransform<?, ?, ?> appliedPTransform, SdkComponents components) throws IOException {
return toProto(
appliedPTransform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), components);
}
private static String toProto(TupleTag<?> tag) {
return tag.getId();
}
/** Returns the URN for the transform if it is known, otherwise {@code null}. */
@Nullable
public static String urnForTransformOrNull(PTransform<?, ?> transform) {
// A RawPTransform directly vends its URN. Because it will generally be
// a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS.
if (transform instanceof RawPTransform) {
return ((RawPTransform) transform).getUrn();
}
TransformPayloadTranslator translator = KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass());
if (translator == null) {
return null;
}
return translator.getUrn(transform);
}
/** Returns the URN for the transform if it is known, otherwise throws. */
public static String urnForTransform(PTransform<?, ?> transform) {
String urn = urnForTransformOrNull(transform);
if (urn == null) {
throw new IllegalStateException(
String.format("No translator known for %s", transform.getClass().getName()));
}
return urn;
}
/**
* A bi-directional translator between a Java-based {@link PTransform} and a protobuf payload for
* that transform.
*
* <p>When going to a protocol buffer message, the translator produces a payload corresponding to
* the Java representation while registering components that payload references.
*
* <p>When "rehydrating" a protocol buffer message, the translator returns a {@link RawPTransform}
* - because the transform may not be Java-based, it is not possible to rebuild a Java-based
* {@link PTransform}. The resulting {@link RawPTransform} subclass encapsulates the knowledge of
* which components are referenced in the payload.
*/
public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
String getUrn(T transform);
FunctionSpec translate(AppliedPTransform<?, ?, T> application, SdkComponents components)
throws IOException;
RawPTransform<?, ?> rehydrate(
RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
throws IOException;
/**
* A {@link TransformPayloadTranslator} for transforms that contain no references to components,
* so they do not need a specialized rehydration.
*/
abstract class WithDefaultRehydration<T extends PTransform<?, ?>>
implements TransformPayloadTranslator<T> {
@Override
public final RawPTransform<?, ?> rehydrate(
RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
throws IOException {
return UnknownRawPTransform.forSpec(protoTransform.getSpec());
}
}
/**
* A {@link TransformPayloadTranslator} for transforms that contain no references to components,
* so they do not need a specialized rehydration.
*/
abstract class NotSerializable<T extends PTransform<?, ?>>
implements TransformPayloadTranslator<T> {
public static NotSerializable<?> forUrn(final String urn) {
return new NotSerializable<PTransform<?, ?>>() {
@Override
public String getUrn(PTransform<?, ?> transform) {
return urn;
}
};
}
@Override
public final FunctionSpec translate(
AppliedPTransform<?, ?, T> transform, SdkComponents components) throws IOException {
throw new UnsupportedOperationException(
String.format(
"%s should never be translated",
transform.getTransform().getClass().getCanonicalName()));
}
@Override
public final RawPTransform<?, ?> rehydrate(
RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
throws IOException {
throw new UnsupportedOperationException(
String.format(
"%s.rehydrate should never be called; there is no serialized form",
getClass().getCanonicalName()));
}
}
}
/**
* A {@link PTransform} that indicates its URN and payload directly.
*
* <p>This is the result of rehydrating transforms from a pipeline proto. There is no {@link
* #expand} method since the definition of the transform may be lost. The transform is already
* fully expanded in the pipeline proto.
*/
public abstract static class RawPTransform<InputT extends PInput, OutputT extends POutput>
extends PTransform<InputT, OutputT> {
/** The URN for this transform, if standardized. */
@Nullable
public String getUrn() {
return getSpec() == null ? null : getSpec().getUrn();
}
/** The payload for this transform, if any. */
@Nullable
public abstract FunctionSpec getSpec();
/**
* Build a new payload set in the context of the given {@link SdkComponents}, if applicable.
*
* <p>When re-serializing this transform, the ids reference in the rehydrated payload may
* conflict with those defined by the serialization context. In that case, the components must
* be re-registered and a new payload returned.
*/
public FunctionSpec migrate(SdkComponents components) throws IOException {
return getSpec();
}
/**
* By default, throws an exception, but can be overridden.
*
* <p>It is permissible for runner-specific transforms to be both a {@link RawPTransform} that
* directly vends its proto representation and also to expand, for convenience of not having to
* register a translator.
*/
@Override
public OutputT expand(InputT input) {
throw new IllegalStateException(
String.format(
"%s should never be asked to expand;"
+ " it is the result of deserializing an already-constructed Pipeline",
getClass().getSimpleName()));
}
}
@AutoValue
abstract static class UnknownRawPTransform extends RawPTransform<PInput, POutput> {
@Override
public String getUrn() {
return getSpec() == null ? null : getSpec().getUrn();
}
@Nullable
public abstract RunnerApi.FunctionSpec getSpec();
public static UnknownRawPTransform forSpec(RunnerApi.FunctionSpec spec) {
return new AutoValue_PTransformTranslation_UnknownRawPTransform(spec);
}
@Override
public POutput expand(PInput input) {
throw new IllegalStateException(
String.format(
"%s should never be asked to expand;"
+ " it is the result of deserializing an already-constructed Pipeline",
getClass().getSimpleName()));
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("urn", getUrn())
.add("payload", getSpec())
.toString();
}
public RunnerApi.FunctionSpec getSpecForComponents(SdkComponents components) {
return getSpec();
}
}
/** A translator that uses the explicit URN and payload from a {@link RawPTransform}. */
public static class RawPTransformTranslator
implements TransformPayloadTranslator<RawPTransform<?, ?>> {
@Override
public String getUrn(RawPTransform<?, ?> transform) {
return transform.getUrn();
}
@Override
public FunctionSpec translate(
AppliedPTransform<?, ?, RawPTransform<?, ?>> transform, SdkComponents components)
throws IOException {
return transform.getTransform().migrate(components);
}
@Override
public RawPTransform<?, ?> rehydrate(
RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents) {
return UnknownRawPTransform.forSpec(protoTransform.getSpec());
}
}
}