blob: 02f53eccdc12dcd35946e5662202ce6b3abb5011 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.apex.translation;
import com.datatorrent.api.DAG;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
import org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator;
import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems;
import org.apache.beam.runners.core.construction.PrimitiveCreate;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link ApexPipelineTranslator} translates {@link Pipeline} objects
* into Apex logical plan {@link DAG}.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
private static final Logger LOG = LoggerFactory.getLogger(ApexPipelineTranslator.class);
/**
* A map from {@link PTransform} subclass to the corresponding
* {@link TransformTranslator} to use to translate that transform.
*/
private static final Map<Class<? extends PTransform>, TransformTranslator>
transformTranslators = new HashMap<>();
private final TranslationContext translationContext;
static {
// register TransformTranslators
registerTransformTranslator(ParDo.MultiOutput.class, new ParDoTranslator<>());
registerTransformTranslator(SplittableParDoViaKeyedWorkItems.ProcessElements.class,
new ParDoTranslator.SplittableProcessElementsTranslator());
registerTransformTranslator(GBKIntoKeyedWorkItems.class,
new GBKIntoKeyedWorkItemsTranslator());
registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
registerTransformTranslator(Flatten.PCollections.class,
new FlattenPCollectionTranslator());
registerTransformTranslator(PrimitiveCreate.class, new CreateValuesTranslator());
registerTransformTranslator(CreateApexPCollectionView.class,
new CreateApexPCollectionViewTranslator());
registerTransformTranslator(CreatePCollectionView.class,
new CreatePCollectionViewTranslator());
registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator());
}
public ApexPipelineTranslator(ApexPipelineOptions options) {
this.translationContext = new TranslationContext(options);
}
public void translate(Pipeline pipeline, DAG dag) {
pipeline.traverseTopologically(this);
translationContext.populateDAG(dag);
}
@Override
public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
LOG.debug("entering composite transform {}", node.getTransform());
return CompositeBehavior.ENTER_TRANSFORM;
}
@Override
public void leaveCompositeTransform(TransformHierarchy.Node node) {
LOG.debug("leaving composite transform {}", node.getTransform());
}
@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
LOG.debug("visiting transform {}", node.getTransform());
PTransform transform = node.getTransform();
TransformTranslator translator = getTransformTranslator(transform.getClass());
if (null == translator) {
throw new UnsupportedOperationException(
"no translator registered for " + transform);
}
translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
translator.translate(transform, translationContext);
}
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {
LOG.debug("visiting value {}", value);
}
/**
* Records that instances of the specified PTransform class
* should be translated by default by the corresponding
* {@link TransformTranslator}.
*/
private static <TransformT extends PTransform> void registerTransformTranslator(
Class<TransformT> transformClass,
TransformTranslator<? extends TransformT> transformTranslator) {
if (transformTranslators.put(transformClass, transformTranslator) != null) {
throw new IllegalArgumentException(
"defining multiple translators for " + transformClass);
}
}
/**
* Returns the {@link TransformTranslator} to use for instances of the
* specified PTransform class, or null if none registered.
*/
private <TransformT extends PTransform<?, ?>>
TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
return transformTranslators.get(transformClass);
}
private static class ReadBoundedTranslator<T> implements TransformTranslator<Read.Bounded<T>> {
private static final long serialVersionUID = 1L;
@Override
public void translate(Read.Bounded<T> transform, TranslationContext context) {
// TODO: adapter is visibleForTesting
BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(
transform.getSource());
ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
unboundedSource, true, context.getPipelineOptions());
context.addOperator(operator, operator.output);
}
}
private static class CreateApexPCollectionViewTranslator<ElemT, ViewT>
implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>> {
private static final long serialVersionUID = 1L;
@Override
public void translate(
CreateApexPCollectionView<ElemT, ViewT> transform, TranslationContext context) {
context.addView(transform.getView());
LOG.debug("view {}", transform.getView().getName());
}
}
private static class CreatePCollectionViewTranslator<ElemT, ViewT>
implements TransformTranslator<CreatePCollectionView<ElemT, ViewT>> {
private static final long serialVersionUID = 1L;
@Override
public void translate(
CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
context.addView(transform.getView());
LOG.debug("view {}", transform.getView().getName());
}
}
private static class GBKIntoKeyedWorkItemsTranslator<K, InputT>
implements TransformTranslator<GBKIntoKeyedWorkItems<K, InputT>> {
@Override
public void translate(
GBKIntoKeyedWorkItems<K, InputT> transform, TranslationContext context) {
// https://issues.apache.org/jira/browse/BEAM-1850
ApexProcessFnOperator<KV<K, InputT>> operator = ApexProcessFnOperator.toKeyedWorkItems(
context.getPipelineOptions());
context.addOperator(operator, operator.outputPort);
context.addStream(context.getInput(), operator.inputPort);
}
}
}