blob: eee804cc72a4413cd7300f1db1c87e415dcea18f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct.portable;
import static org.apache.beam.runners.core.construction.SyntheticComponents.uniqueId;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.getOnlyElement;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.beam.model.fnexecution.v1.ProvisionApi.ProvisionInfo;
import org.apache.beam.model.fnexecution.v1.ProvisionApi.Resources;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.ComponentsOrBuilder;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform.Builder;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.runners.core.construction.ModelCoders.KvCoderComponents;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
import org.apache.beam.runners.core.construction.graph.PipelineValidator;
import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
import org.apache.beam.runners.core.construction.graph.ProtoOverrides.TransformReplacement;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.direct.ExecutableGraph;
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
import org.apache.beam.runners.fnexecution.control.SingleEnvironmentInstanceJobBundleFactory;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.EmbeddedEnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.joda.time.Duration;
import org.joda.time.Instant;
/**
* The "ReferenceRunner" engine implementation. The ReferenceRunner uses the portability framework
* to execute a Pipeline on a single machine.
*/
public class ReferenceRunner {
private final RunnerApi.Pipeline pipeline;
private final Struct options;
private final String artifactRetrievalToken;
private final EnvironmentType environmentType;
private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
/** @param environmentType The environment to use for the SDK Harness. */
private ReferenceRunner(
Pipeline p, Struct options, String artifactRetrievalToken, EnvironmentType environmentType) {
this.pipeline = executable(p);
this.options = options;
this.environmentType = environmentType;
this.artifactRetrievalToken = artifactRetrievalToken;
}
/**
* Creates a "ReferenceRunner" engine for a single pipeline with a Dockerized SDK harness.
*
* @param p Pipeline being executed for this job.
* @param options PipelineOptions for this job.
* @param artifactRetrievalToken Token to retrieve artifacts that have been staged.
*/
public static ReferenceRunner forPipeline(
RunnerApi.Pipeline p, Struct options, String artifactRetrievalToken) {
return new ReferenceRunner(p, options, artifactRetrievalToken, EnvironmentType.DOCKER);
}
static ReferenceRunner forInProcessPipeline(RunnerApi.Pipeline p, Struct options) {
return new ReferenceRunner(p, options, "", EnvironmentType.IN_PROCESS);
}
private RunnerApi.Pipeline executable(RunnerApi.Pipeline original) {
RunnerApi.Pipeline p = original;
PipelineValidator.validate(p);
p =
ProtoOverrides.updateTransform(
PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN,
p,
new SplittableProcessKeyedReplacer());
p =
ProtoOverrides.updateTransform(
PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, p, new PortableGroupByKeyReplacer());
p = GreedyPipelineFuser.fuse(p).toPipeline();
p = foldFeedSDFIntoExecutableStage(p);
PipelineValidator.validate(p);
return p;
}
private static Set<PCollectionNode> getKeyedPCollections(
ExecutableGraph<PTransformNode, PCollectionNode> graph) {
// This mimics KeyedPValueTrackingVisitor behavior in regular direct runner,
// but without propagating keyed-ness through key-preserving DoFn's.
// That is not yet necessary, but will be necessary once we implement state and timers.
// See https://issues.apache.org/jira/browse/BEAM-4557.
Set<PCollectionNode> res = Sets.newHashSet();
Set<String> keyedProducers =
Sets.newHashSet(DirectGroupByKey.DIRECT_GBKO_URN, DirectGroupByKey.DIRECT_GABW_URN);
for (PTransformNode transform : graph.getExecutables()) {
if (keyedProducers.contains(transform.getTransform().getSpec().getUrn())) {
res.addAll(graph.getProduced(transform));
}
}
return res;
}
/**
* First starts all the services needed, then configures and starts the {@link
* ExecutorServiceParallelExecutor}.
*/
public void execute() throws Exception {
ExecutableGraph<PTransformNode, PCollectionNode> graph = PortableGraph.forPipeline(pipeline);
BundleFactory bundleFactory = ImmutableListBundleFactory.create();
EvaluationContext ctxt =
EvaluationContext.create(Instant::new, bundleFactory, graph, getKeyedPCollections(graph));
RootProviderRegistry rootRegistry = RootProviderRegistry.javaPortableRegistry(bundleFactory);
int targetParallelism = Math.max(Runtime.getRuntime().availableProcessors(), 3);
ServerFactory serverFactory = createServerFactory();
ControlClientPool controlClientPool = MapControlClientPool.create();
ExecutorService dataExecutor = Executors.newCachedThreadPool();
ProvisionInfo provisionInfo =
ProvisionInfo.newBuilder()
.setJobId("id")
.setJobName("reference")
.setPipelineOptions(options)
.setWorkerId("foo")
.setResourceLimits(Resources.getDefaultInstance())
.setRetrievalToken(artifactRetrievalToken)
.build();
try (GrpcFnServer<GrpcLoggingService> logging =
GrpcFnServer.allocatePortAndCreateFor(
GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
GrpcFnServer<ArtifactRetrievalService> artifact =
GrpcFnServer.allocatePortAndCreateFor(
BeamFileSystemArtifactRetrievalService.create(), serverFactory);
GrpcFnServer<StaticGrpcProvisionService> provisioning =
GrpcFnServer.allocatePortAndCreateFor(
StaticGrpcProvisionService.create(provisionInfo), serverFactory);
GrpcFnServer<FnApiControlClientPoolService> control =
GrpcFnServer.allocatePortAndCreateFor(
FnApiControlClientPoolService.offeringClientsToPool(
controlClientPool.getSink(),
GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
serverFactory);
GrpcFnServer<GrpcDataService> data =
GrpcFnServer.allocatePortAndCreateFor(
GrpcDataService.create(dataExecutor, OutboundObserverFactory.serverDirect()),
serverFactory);
GrpcFnServer<GrpcStateService> state =
GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory)) {
EnvironmentFactory environmentFactory =
createEnvironmentFactory(control, logging, artifact, provisioning, controlClientPool);
JobBundleFactory jobBundleFactory =
SingleEnvironmentInstanceJobBundleFactory.create(
environmentFactory, data, state, idGenerator);
TransformEvaluatorRegistry transformRegistry =
TransformEvaluatorRegistry.portableRegistry(
graph,
pipeline.getComponents(),
bundleFactory,
jobBundleFactory,
EvaluationContextStepStateAndTimersProvider.forContext(ctxt));
ExecutorServiceParallelExecutor executor =
ExecutorServiceParallelExecutor.create(
targetParallelism, rootRegistry, transformRegistry, graph, ctxt);
executor.start();
executor.waitUntilFinish(Duration.ZERO);
} finally {
dataExecutor.shutdown();
}
}
private ServerFactory createServerFactory() {
switch (environmentType) {
case DOCKER:
return ServerFactory.createDefault();
case IN_PROCESS:
return InProcessServerFactory.create();
default:
throw new IllegalArgumentException(
String.format("Unknown %s %s", EnvironmentType.class.getSimpleName(), environmentType));
}
}
private EnvironmentFactory createEnvironmentFactory(
GrpcFnServer<FnApiControlClientPoolService> control,
GrpcFnServer<GrpcLoggingService> logging,
GrpcFnServer<ArtifactRetrievalService> artifact,
GrpcFnServer<StaticGrpcProvisionService> provisioning,
ControlClientPool controlClient) {
switch (environmentType) {
case DOCKER:
return new DockerEnvironmentFactory.Provider(PipelineOptionsTranslation.fromProto(options))
.createEnvironmentFactory(
control, logging, artifact, provisioning, controlClient, idGenerator);
case IN_PROCESS:
return EmbeddedEnvironmentFactory.create(
PipelineOptionsFactory.create(), logging, control, controlClient.getSource());
default:
throw new IllegalArgumentException(
String.format("Unknown %s %s", EnvironmentType.class.getSimpleName(), environmentType));
}
}
@VisibleForTesting
static class PortableGroupByKeyReplacer implements TransformReplacement {
@Override
public MessageWithComponents getReplacement(String gbkId, ComponentsOrBuilder components) {
PTransform gbk = components.getTransformsOrThrow(gbkId);
checkArgument(
PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN.equals(gbk.getSpec().getUrn()),
"URN must be %s, got %s",
PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
gbk.getSpec().getUrn());
PTransform.Builder newTransform = gbk.toBuilder();
Components.Builder newComponents = Components.newBuilder();
String inputId = getOnlyElement(gbk.getInputsMap().values());
// Add the GBKO transform
String kwiCollectionId =
uniqueId(String.format("%s.%s", inputId, "kwi"), components::containsPcollections);
{
PCollection input = components.getPcollectionsOrThrow(inputId);
Coder inputCoder = components.getCodersOrThrow(input.getCoderId());
KvCoderComponents kvComponents = ModelCoders.getKvCoderComponents(inputCoder);
String windowCoderId =
components
.getWindowingStrategiesOrThrow(input.getWindowingStrategyId())
.getWindowCoderId();
// This coder isn't actually required for the pipeline to function properly - the KWIs can
// be passed around as pure java objects with no coding of the values, but it approximates
// a full pipeline.
Coder kwiCoder =
Coder.newBuilder()
.setSpec(
SdkFunctionSpec.newBuilder()
.setSpec(FunctionSpec.newBuilder().setUrn("beam:direct:keyedworkitem:v1")))
.addAllComponentCoderIds(
ImmutableList.of(
kvComponents.keyCoderId(), kvComponents.valueCoderId(), windowCoderId))
.build();
String kwiCoderId =
uniqueId(
String.format("kwi(%s:%s)", kvComponents.keyCoderId(), kvComponents.valueCoderId()),
components::containsCoders);
// The kwi PCollection has the same WindowingStrategy as the input, as no merging will
// have been performed, so elements remain in their original windows
PCollection kwi =
input.toBuilder().setUniqueName(kwiCollectionId).setCoderId(kwiCoderId).build();
String gbkoId = uniqueId(String.format("%s/GBKO", gbkId), components::containsTransforms);
PTransform gbko =
PTransform.newBuilder()
.setUniqueName(String.format("%s/GBKO", gbk.getUniqueName()))
.putAllInputs(gbk.getInputsMap())
.setSpec(FunctionSpec.newBuilder().setUrn(DirectGroupByKey.DIRECT_GBKO_URN))
.putOutputs("output", kwiCollectionId)
.build();
newTransform.addSubtransforms(gbkoId);
newComponents
.putCoders(kwiCoderId, kwiCoder)
.putPcollections(kwiCollectionId, kwi)
.putTransforms(gbkoId, gbko);
}
// Add the GABW transform
{
String gabwId = uniqueId(String.format("%s/GABW", gbkId), components::containsTransforms);
PTransform gabw =
PTransform.newBuilder()
.setUniqueName(String.format("%s/GABW", gbk.getUniqueName()))
.putInputs("input", kwiCollectionId)
.setSpec(FunctionSpec.newBuilder().setUrn(DirectGroupByKey.DIRECT_GABW_URN))
.putAllOutputs(gbk.getOutputsMap())
.build();
newTransform.addSubtransforms(gabwId);
newComponents.putTransforms(gabwId, gabw);
}
return MessageWithComponents.newBuilder()
.setPtransform(newTransform)
.setComponents(newComponents)
.build();
}
}
/**
* Replaces the {@link PTransformTranslation#SPLITTABLE_PROCESS_KEYED_URN} with a {@link
* DirectGroupByKey#DIRECT_GBKO_URN} (construct keyed work items) followed by a {@link
* SplittableRemoteStageEvaluatorFactory#FEED_SDF_URN} (convert the keyed work items to
* element/restriction pairs that later go into {@link
* PTransformTranslation#SPLITTABLE_PROCESS_ELEMENTS_URN}).
*/
@VisibleForTesting
static class SplittableProcessKeyedReplacer implements TransformReplacement {
@Override
public MessageWithComponents getReplacement(String spkId, ComponentsOrBuilder components) {
PTransform spk = components.getTransformsOrThrow(spkId);
checkArgument(
PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN.equals(spk.getSpec().getUrn()),
"URN must be %s, got %s",
PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN,
spk.getSpec().getUrn());
Components.Builder newComponents = Components.newBuilder();
newComponents.putAllCoders(components.getCodersMap());
Builder newPTransform = spk.toBuilder();
String inputId = getOnlyElement(spk.getInputsMap().values());
PCollection input = components.getPcollectionsOrThrow(inputId);
// This is a Coder<KV<String, KV<ElementT, RestrictionT>>>
Coder inputCoder = components.getCodersOrThrow(input.getCoderId());
KvCoderComponents kvComponents = ModelCoders.getKvCoderComponents(inputCoder);
String windowCoderId =
components
.getWindowingStrategiesOrThrow(input.getWindowingStrategyId())
.getWindowCoderId();
// === Construct a raw GBK returning KeyedWorkItem's ===
String kwiCollectionId =
uniqueId(String.format("%s.kwi", spkId), components::containsPcollections);
{
// This coder isn't actually required for the pipeline to function properly - the KWIs can
// be passed around as pure java objects with no coding of the values, but it approximates a
// full pipeline.
Coder kwiCoder =
Coder.newBuilder()
.setSpec(
SdkFunctionSpec.newBuilder()
.setSpec(FunctionSpec.newBuilder().setUrn("beam:direct:keyedworkitem:v1")))
.addAllComponentCoderIds(
ImmutableList.of(
kvComponents.keyCoderId(), kvComponents.valueCoderId(), windowCoderId))
.build();
String kwiCoderId =
uniqueId(
String.format(
"keyed_work_item(%s:%s)",
kvComponents.keyCoderId(), kvComponents.valueCoderId()),
components::containsCoders);
PCollection kwiCollection =
input.toBuilder().setUniqueName(kwiCollectionId).setCoderId(kwiCoderId).build();
String rawGbkId =
uniqueId(String.format("%s/RawGBK", spkId), components::containsTransforms);
PTransform rawGbk =
PTransform.newBuilder()
.setUniqueName(String.format("%s/RawGBK", spk.getUniqueName()))
.putAllInputs(spk.getInputsMap())
.setSpec(FunctionSpec.newBuilder().setUrn(DirectGroupByKey.DIRECT_GBKO_URN))
.putOutputs("output", kwiCollectionId)
.build();
newComponents
.putCoders(kwiCoderId, kwiCoder)
.putPcollections(kwiCollectionId, kwiCollection)
.putTransforms(rawGbkId, rawGbk);
newPTransform.addSubtransforms(rawGbkId);
}
// === Construct a "Feed SDF" operation that converts KWI to KV<ElementT, RestrictionT> ===
String feedSDFCollectionId =
uniqueId(String.format("%s.feed", spkId), components::containsPcollections);
{
String elementRestrictionCoderId = kvComponents.valueCoderId();
String feedSDFCoderId =
LengthPrefixUnknownCoders.addLengthPrefixedCoder(
elementRestrictionCoderId, newComponents, false);
PCollection feedSDFCollection =
input.toBuilder().setUniqueName(feedSDFCollectionId).setCoderId(feedSDFCoderId).build();
String feedSDFId =
uniqueId(String.format("%s/FeedSDF", spkId), components::containsTransforms);
PTransform feedSDF =
PTransform.newBuilder()
.setUniqueName(String.format("%s/FeedSDF", spk.getUniqueName()))
.putInputs("input", kwiCollectionId)
.setSpec(
FunctionSpec.newBuilder()
.setUrn(SplittableRemoteStageEvaluatorFactory.FEED_SDF_URN))
.putOutputs("output", feedSDFCollectionId)
.build();
newComponents
.putPcollections(feedSDFCollectionId, feedSDFCollection)
.putTransforms(feedSDFId, feedSDF);
newPTransform.addSubtransforms(feedSDFId);
}
// === Construct the SPLITTABLE_PROCESS_ELEMENTS operation
{
String runSDFId =
uniqueId(String.format("%s/RunSDF", spkId), components::containsTransforms);
PTransform runSDF =
PTransform.newBuilder()
.setUniqueName(String.format("%s/RunSDF", spk.getUniqueName()))
.putInputs("input", feedSDFCollectionId)
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN)
.setPayload(spk.getSpec().getPayload()))
.putAllOutputs(spk.getOutputsMap())
.build();
newComponents.putTransforms(runSDFId, runSDF);
newPTransform.addSubtransforms(runSDFId);
}
return MessageWithComponents.newBuilder()
.setPtransform(newPTransform.build())
.setComponents(newComponents)
.build();
}
}
/**
* Finds FEED_SDF nodes followed by an ExecutableStage and replaces them by a single {@link
* SplittableRemoteStageEvaluatorFactory#URN} stage that feeds the ExecutableStage knowing that
* the first instruction in the stage is an SDF.
*/
private static Pipeline foldFeedSDFIntoExecutableStage(Pipeline p) {
Pipeline.Builder newPipeline = p.toBuilder();
Components.Builder newPipelineComponents = newPipeline.getComponentsBuilder();
QueryablePipeline q = QueryablePipeline.forPipeline(p);
String feedSdfUrn = SplittableRemoteStageEvaluatorFactory.FEED_SDF_URN;
List<PTransformNode> feedSDFNodes =
q.getTransforms().stream()
.filter(node -> node.getTransform().getSpec().getUrn().equals(feedSdfUrn))
.collect(Collectors.toList());
Map<String, PTransformNode> stageToFeeder = Maps.newHashMap();
for (PTransformNode node : feedSDFNodes) {
PCollectionNode output = Iterables.getOnlyElement(q.getOutputPCollections(node));
PTransformNode consumer = Iterables.getOnlyElement(q.getPerElementConsumers(output));
String consumerUrn = consumer.getTransform().getSpec().getUrn();
checkState(
consumerUrn.equals(ExecutableStage.URN),
"Expected all FeedSDF nodes to be consumed by an ExecutableStage, "
+ "but %s is consumed by %s which is %s",
node.getId(),
consumer.getId(),
consumerUrn);
stageToFeeder.put(consumer.getId(), node);
}
// Copy over root transforms except for the excluded FEED_SDF transforms.
Set<String> feedSDFIds =
feedSDFNodes.stream().map(PTransformNode::getId).collect(Collectors.toSet());
newPipeline.clearRootTransformIds();
for (String rootId : p.getRootTransformIdsList()) {
if (!feedSDFIds.contains(rootId)) {
newPipeline.addRootTransformIds(rootId);
}
}
// Copy over all transforms, except FEED_SDF transforms are skipped, and ExecutableStage's
// feeding from them are replaced.
for (PTransformNode node : q.getTransforms()) {
if (feedSDFNodes.contains(node)) {
// These transforms are skipped and handled separately.
continue;
}
if (!stageToFeeder.containsKey(node.getId())) {
// This transform is unchanged
newPipelineComponents.putTransforms(node.getId(), node.getTransform());
continue;
}
// "node" is an ExecutableStage transform feeding from an SDF.
PTransformNode feedSDFNode = stageToFeeder.get(node.getId());
PCollectionNode rawGBKOutput =
Iterables.getOnlyElement(q.getPerElementInputPCollections(feedSDFNode));
// Replace the ExecutableStage transform.
newPipelineComponents.putTransforms(
node.getId(),
node.getTransform()
.toBuilder()
.mergeSpec(
// Change URN from ExecutableStage.URN to URN of the ULR's splittable executable
// stage evaluator.
FunctionSpec.newBuilder()
.setUrn(SplittableRemoteStageEvaluatorFactory.URN)
.build())
.putInputs(
// The splittable executable stage now reads from the raw GBK, instead of
// from the now non-existent FEED_SDF.
Iterables.getOnlyElement(node.getTransform().getInputsMap().keySet()),
rawGBKOutput.getId())
.build());
}
return newPipeline.build();
}
private enum EnvironmentType {
DOCKER,
IN_PROCESS
}
}