blob: aa147070808750dbe06972cc653cb45bb34869d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction.graph;
import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.CombineComponents;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.Composites;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.Primitives;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.SplittableParDoComponents;
import org.apache.beam.model.pipeline.v1.RunnerApi.TestStreamPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
/**
* Validates well-formedness of a pipeline. It is recommended to use this class on any user-supplied
* Pipeline protos, and after any transformations on the pipeline, to verify that the
* transformations didn't break well-formedness.
*/
public class PipelineValidator {
@FunctionalInterface
private interface TransformValidator {
void validate(String transformId, PTransform transform, Components components) throws Exception;
}
private static final ImmutableMap<String, TransformValidator> VALIDATORS =
ImmutableMap.<String, TransformValidator>builder()
.put(getUrn(Primitives.PAR_DO), PipelineValidator::validateParDo)
// Nothing to validate for FLATTEN, GROUP_BY_KEY, IMPULSE
.put(getUrn(Primitives.ASSIGN_WINDOWS), PipelineValidator::validateAssignWindows)
.put(getUrn(Primitives.TEST_STREAM), PipelineValidator::validateTestStream)
// Nothing to validate for MAP_WINDOWS, READ, CREATE_VIEW.
.put(getUrn(Composites.COMBINE_PER_KEY), PipelineValidator::validateCombine)
.put(getUrn(Composites.COMBINE_GLOBALLY), PipelineValidator::validateCombine)
// Nothing to validate for RESHUFFLE and WRITE_FILES
.put(getUrn(CombineComponents.COMBINE_PGBKCV), PipelineValidator::validateCombine)
.put(
getUrn(CombineComponents.COMBINE_MERGE_ACCUMULATORS),
PipelineValidator::validateCombine)
.put(
getUrn(CombineComponents.COMBINE_EXTRACT_OUTPUTS), PipelineValidator::validateCombine)
.put(
getUrn(CombineComponents.COMBINE_PER_KEY_PRECOMBINE),
PipelineValidator::validateCombine)
.put(
getUrn(CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS),
PipelineValidator::validateCombine)
.put(
getUrn(CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS),
PipelineValidator::validateCombine)
.put(getUrn(CombineComponents.COMBINE_GROUPED_VALUES), PipelineValidator::validateCombine)
.put(
getUrn(SplittableParDoComponents.PAIR_WITH_RESTRICTION),
PipelineValidator::validateParDo)
.put(
getUrn(SplittableParDoComponents.SPLIT_RESTRICTION), PipelineValidator::validateParDo)
.put(
getUrn(SplittableParDoComponents.PROCESS_KEYED_ELEMENTS),
PipelineValidator::validateParDo)
.put(ExecutableStage.URN, PipelineValidator::validateExecutableStage)
.build();
public static void validate(RunnerApi.Pipeline p) {
Components components = p.getComponents();
for (String transformId : p.getRootTransformIdsList()) {
checkArgument(
components.containsTransforms(transformId),
"Root transform id %s is unknown",
transformId);
}
validateComponents("pipeline", components);
}
private static void validateComponents(String context, Components components) {
{
Map<String, String> uniqueNamesById = Maps.newHashMap();
for (String transformId : components.getTransformsMap().keySet()) {
PTransform transform = components.getTransformsOrThrow(transformId);
String previousId = uniqueNamesById.put(transform.getUniqueName(), transformId);
// A transform is allowed to not have unique_name set, but, obviously,
// there can be only one such transform with an empty name.
// It's allowed for the (only) root transform to have the empty unique_name.
checkArgument(
previousId == null,
"%s: Transforms %s and %s both have unique_name \"%s\"",
context,
transformId,
previousId,
transform.getUniqueName());
validateTransform(transformId, transform, components);
}
}
{
Map<String, String> uniqueNamesById = Maps.newHashMap();
for (String pcollectionId : components.getPcollectionsMap().keySet()) {
PCollection pc = components.getPcollectionsOrThrow(pcollectionId);
checkArgument(
!pc.getUniqueName().isEmpty(),
"%s: PCollection %s does not have a unique_name set",
context,
pcollectionId);
String previousId = uniqueNamesById.put(pc.getUniqueName(), pcollectionId);
checkArgument(
previousId == null,
"%s: PCollections %s and %s both have unique_name \"%s\"",
context,
pcollectionId,
previousId,
pc.getUniqueName());
checkArgument(
components.containsCoders(pc.getCoderId()),
"%s: PCollection %s uses unknown coder %s",
context,
pcollectionId,
pc.getCoderId());
checkArgument(
components.containsWindowingStrategies(pc.getWindowingStrategyId()),
"%s: PCollection %s uses unknown windowing strategy %s",
context,
pcollectionId,
pc.getWindowingStrategyId());
}
}
for (String strategyId : components.getWindowingStrategiesMap().keySet()) {
WindowingStrategy strategy = components.getWindowingStrategiesOrThrow(strategyId);
checkArgument(
components.containsCoders(strategy.getWindowCoderId()),
"%s: WindowingStrategy %s uses unknown coder %s",
context,
strategyId,
strategy.getWindowCoderId());
}
for (String coderId : components.getCodersMap().keySet()) {
for (String componentCoderId :
components.getCodersOrThrow(coderId).getComponentCoderIdsList()) {
checkArgument(
components.containsCoders(componentCoderId),
"%s: Coder %s uses unknown component coder %s",
context,
coderId,
componentCoderId);
}
}
}
private static void validateTransform(String id, PTransform transform, Components components) {
for (String subtransformId : transform.getSubtransformsList()) {
checkArgument(
components.containsTransforms(subtransformId),
"Transform %s references unknown subtransform %s",
id,
subtransformId);
}
for (String inputId : transform.getInputsMap().keySet()) {
String pcollectionId = transform.getInputsOrThrow(inputId);
checkArgument(
components.containsPcollections(pcollectionId),
"Transform %s input %s points to unknown PCollection %s",
id,
inputId,
pcollectionId);
}
for (String outputId : transform.getOutputsMap().keySet()) {
String pcollectionId = transform.getOutputsOrThrow(outputId);
checkArgument(
components.containsPcollections(pcollectionId),
"Transform %s output %s points to unknown PCollection %s",
id,
outputId,
pcollectionId);
}
String urn = transform.getSpec().getUrn();
if (VALIDATORS.containsKey(urn)) {
try {
VALIDATORS.get(urn).validate(id, transform, components);
} catch (Exception e) {
throw new RuntimeException(String.format("Failed to validate transform %s", id), e);
}
}
}
private static void validateParDo(String id, PTransform transform, Components components)
throws Exception {
ParDoPayload payload = ParDoPayload.parseFrom(transform.getSpec().getPayload());
// side_inputs
for (String sideInputId : payload.getSideInputsMap().keySet()) {
checkArgument(
transform.containsInputs(sideInputId),
"Transform %s side input %s is not listed in the transform's inputs",
id,
sideInputId);
}
// TODO: Validate state_specs and timer_specs
if (!payload.getRestrictionCoderId().isEmpty()) {
checkArgument(components.containsCoders(payload.getRestrictionCoderId()));
}
}
private static void validateAssignWindows(String id, PTransform transform, Components components)
throws Exception {
WindowIntoPayload.parseFrom(transform.getSpec().getPayload());
}
private static void validateTestStream(String id, PTransform transform, Components components)
throws Exception {
TestStreamPayload.parseFrom(transform.getSpec().getPayload());
}
private static void validateCombine(String id, PTransform transform, Components components)
throws Exception {
CombinePayload payload = CombinePayload.parseFrom(transform.getSpec().getPayload());
checkArgument(
components.containsCoders(payload.getAccumulatorCoderId()),
"Transform %s uses unknown accumulator coder id %s",
payload.getAccumulatorCoderId());
}
private static void validateExecutableStage(
String id, PTransform transform, Components outerComponents) throws Exception {
ExecutableStagePayload payload =
ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
// Everything within an ExecutableStagePayload uses only the stage's components.
Components components = payload.getComponents();
checkArgument(
transform.getInputsMap().values().contains(payload.getInput()),
"ExecutableStage %s uses unknown input %s",
id,
payload.getInput());
checkArgument(
!payload.getTransformsList().isEmpty(), "ExecutableStage %s contains no transforms", id);
for (String subtransformId : payload.getTransformsList()) {
checkArgument(
components.containsTransforms(subtransformId),
"ExecutableStage %s uses unknown transform %s",
id,
subtransformId);
}
for (String outputId : payload.getOutputsList()) {
checkArgument(
components.containsPcollections(outputId),
"ExecutableStage %s uses unknown output %s",
id,
outputId);
}
validateComponents("ExecutableStage " + id, components);
// TODO: Also validate that side inputs of all transforms within components.getTransforms()
// are contained within payload.getSideInputsList()
}
}