blob: 81d2cbcca92a86930aa9452d1ba5f03027615509 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
/** SDK objects that will be represented at some later point within a {@link Components} object. */
public class SdkComponents {
private final String newIdPrefix;
private final RunnerApi.Components.Builder componentsBuilder = RunnerApi.Components.newBuilder();
private final BiMap<AppliedPTransform<?, ?, ?>, String> transformIds = HashBiMap.create();
private final BiMap<PCollection<?>, String> pCollectionIds = HashBiMap.create();
private final BiMap<WindowingStrategy<?, ?>, String> windowingStrategyIds = HashBiMap.create();
private final BiMap<Coder<?>, String> coderIds = HashBiMap.create();
private final BiMap<Environment, String> environmentIds = HashBiMap.create();
private final Set<String> reservedIds = new HashSet<>();
private String defaultEnvironmentId;
/** Create a new {@link SdkComponents} with no components. */
public static SdkComponents create() {
return new SdkComponents(RunnerApi.Components.getDefaultInstance(), "");
}
/**
* Create new {@link SdkComponents} importing all items from provided {@link Components} object.
*
* <p>WARNING: This action might cause some of duplicate items created.
*/
public static SdkComponents create(RunnerApi.Components components) {
return new SdkComponents(components, "");
}
/*package*/ static SdkComponents create(
RunnerApi.Components components,
Map<String, AppliedPTransform<?, ?, ?>> transforms,
Map<String, PCollection<?>> pCollections,
Map<String, WindowingStrategy<?, ?>> windowingStrategies,
Map<String, Coder<?>> coders,
Map<String, Environment> environments) {
SdkComponents sdkComponents = SdkComponents.create(components);
sdkComponents.transformIds.inverse().putAll(transforms);
sdkComponents.pCollectionIds.inverse().putAll(pCollections);
sdkComponents.windowingStrategyIds.inverse().putAll(windowingStrategies);
sdkComponents.coderIds.inverse().putAll(coders);
sdkComponents.environmentIds.inverse().putAll(environments);
return sdkComponents;
}
public static SdkComponents create(PipelineOptions options) {
SdkComponents sdkComponents = new SdkComponents(RunnerApi.Components.getDefaultInstance(), "");
PortablePipelineOptions portablePipelineOptions = options.as(PortablePipelineOptions.class);
sdkComponents.defaultEnvironmentId =
sdkComponents.registerEnvironment(
Environments.createOrGetDefaultEnvironment(
portablePipelineOptions.getDefaultEnvironmentType(),
portablePipelineOptions.getDefaultEnvironmentConfig()));
return sdkComponents;
}
private SdkComponents(RunnerApi.Components components, String newIdPrefix) {
this.newIdPrefix = newIdPrefix;
if (components == null) {
return;
}
mergeFrom(components);
}
/** Merge Components proto into this SdkComponents instance. */
public void mergeFrom(RunnerApi.Components components) {
reservedIds.addAll(components.getTransformsMap().keySet());
reservedIds.addAll(components.getPcollectionsMap().keySet());
reservedIds.addAll(components.getWindowingStrategiesMap().keySet());
reservedIds.addAll(components.getCodersMap().keySet());
reservedIds.addAll(components.getEnvironmentsMap().keySet());
environmentIds.inverse().putAll(components.getEnvironmentsMap());
componentsBuilder.mergeFrom(components);
}
/**
* Returns an SdkComponents like this one, but which will prefix all newly generated ids with the
* given string.
*
* <p>Useful for ensuring independently-constructed components have non-overlapping ids.
*/
public SdkComponents withNewIdPrefix(String newIdPrefix) {
SdkComponents sdkComponents = new SdkComponents(componentsBuilder.build(), newIdPrefix);
sdkComponents.transformIds.putAll(transformIds);
sdkComponents.pCollectionIds.putAll(pCollectionIds);
sdkComponents.windowingStrategyIds.putAll(windowingStrategyIds);
sdkComponents.coderIds.putAll(coderIds);
sdkComponents.environmentIds.putAll(environmentIds);
return sdkComponents;
}
/**
* Registers the provided {@link AppliedPTransform} into this {@link SdkComponents}, returning a
* unique ID for the {@link AppliedPTransform}. Multiple registrations of the same {@link
* AppliedPTransform} will return the same unique ID.
*
* <p>All of the children must already be registered within this {@link SdkComponents}.
*/
public String registerPTransform(
AppliedPTransform<?, ?, ?> appliedPTransform, List<AppliedPTransform<?, ?, ?>> children)
throws IOException {
String name = getApplicationName(appliedPTransform);
// If this transform is present in the components, nothing to do. return the existing name.
// Otherwise the transform must be translated and added to the components.
if (componentsBuilder.getTransformsOrDefault(name, null) != null) {
return name;
}
checkNotNull(children, "child nodes may not be null");
componentsBuilder.putTransforms(
name, PTransformTranslation.toProto(appliedPTransform, children, this));
return name;
}
/**
* Gets the ID for the provided {@link AppliedPTransform}. The provided {@link AppliedPTransform}
* will not be added to the components produced by this {@link SdkComponents} until it is
* translated via {@link #registerPTransform(AppliedPTransform, List)}.
*/
private String getApplicationName(AppliedPTransform<?, ?, ?> appliedPTransform) {
String existing = transformIds.get(appliedPTransform);
if (existing != null) {
return existing;
}
String name = appliedPTransform.getFullName();
if (name.isEmpty()) {
name = "unnamed-ptransform";
}
name = uniqify(name, transformIds.values());
transformIds.put(appliedPTransform, name);
return name;
}
String getExistingPTransformId(AppliedPTransform<?, ?, ?> appliedPTransform) {
checkArgument(
transformIds.containsKey(appliedPTransform),
"%s %s has not been previously registered",
AppliedPTransform.class.getSimpleName(),
appliedPTransform);
return transformIds.get(appliedPTransform);
}
public String getPTransformIdOrThrow(AppliedPTransform<?, ?, ?> appliedPTransform) {
String existing = transformIds.get(appliedPTransform);
checkArgument(existing != null, "PTransform id not found for: %s", appliedPTransform);
return existing;
}
/**
* Registers the provided {@link PCollection} into this {@link SdkComponents}, returning a unique
* ID for the {@link PCollection}. Multiple registrations of the same {@link PCollection} will
* return the same unique ID.
*/
public String registerPCollection(PCollection<?> pCollection) throws IOException {
String existing = pCollectionIds.get(pCollection);
if (existing != null) {
return existing;
}
String uniqueName = uniqify(pCollection.getName(), pCollectionIds.values());
pCollectionIds.put(pCollection, uniqueName);
componentsBuilder.putPcollections(
uniqueName, PCollectionTranslation.toProto(pCollection, this));
return uniqueName;
}
/**
* Registers the provided {@link WindowingStrategy} into this {@link SdkComponents}, returning a
* unique ID for the {@link WindowingStrategy}. Multiple registrations of the same {@link
* WindowingStrategy} will return the same unique ID.
*/
public String registerWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy)
throws IOException {
String existing = windowingStrategyIds.get(windowingStrategy);
if (existing != null) {
return existing;
}
String baseName =
String.format(
"%s(%s)",
NameUtils.approximateSimpleName(windowingStrategy),
NameUtils.approximateSimpleName(windowingStrategy.getWindowFn()));
String name = uniqify(baseName, windowingStrategyIds.values());
windowingStrategyIds.put(windowingStrategy, name);
RunnerApi.WindowingStrategy windowingStrategyProto =
WindowingStrategyTranslation.toProto(windowingStrategy, this);
componentsBuilder.putWindowingStrategies(name, windowingStrategyProto);
return name;
}
/**
* Registers the provided {@link Coder} into this {@link SdkComponents}, returning a unique ID for
* the {@link Coder}. Multiple registrations of the same {@link Coder} will return the same unique
* ID.
*
* <p>Coders are stored by identity to ensure that coders with implementations of {@link
* #equals(Object)} and {@link #hashCode()} but incompatible binary formats are not considered the
* same coder.
*/
public String registerCoder(Coder<?> coder) throws IOException {
String existing = coderIds.get(coder);
if (existing != null) {
return existing;
}
String baseName = NameUtils.approximateSimpleName(coder);
String name = uniqify(baseName, coderIds.values());
coderIds.put(coder, name);
RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, this);
componentsBuilder.putCoders(name, coderProto);
return name;
}
/**
* Registers the provided {@link Environment} into this {@link SdkComponents}, returning a unique
* ID for the {@link Environment}. Multiple registrations of the same {@link Environment} will
* return the same unique ID.
*/
public String registerEnvironment(Environment env) {
String existing = environmentIds.get(env);
if (existing != null) {
return existing;
}
String name = uniqify(env.getUrn(), environmentIds.values());
environmentIds.put(env, name);
componentsBuilder.putEnvironments(name, env);
return name;
}
public String getOnlyEnvironmentId() {
// TODO Support multiple environments. The environment should be decided by the translation.
if (defaultEnvironmentId != null) {
return defaultEnvironmentId;
} else {
return Iterables.getOnlyElement(componentsBuilder.getEnvironmentsMap().keySet());
}
}
private String uniqify(String baseName, Set<String> existing) {
String name = newIdPrefix + baseName;
int increment = 1;
while (existing.contains(name) || reservedIds.contains(name)) {
name = newIdPrefix + baseName + Integer.toString(increment);
increment++;
}
return name;
}
/**
* Convert this {@link SdkComponents} into a {@link RunnerApi.Components}, including all of the
* contained {@link Coder coders}, {@link WindowingStrategy windowing strategies}, {@link
* PCollection PCollections}, and {@link PTransform PTransforms}.
*/
@Experimental
public RunnerApi.Components toComponents() {
return componentsBuilder.build();
}
}