blob: 0f851d9588d98c85c2bcaa7a85c3ad0320fbb480 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.structuredstreaming.translation;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link Pipeline.PipelineVisitor} that translates the Beam operators to their Spark counterparts.
* It also does the pipeline preparation: mode detection, transforms replacement, classpath
* preparation. If we have a streaming job, it is instantiated as a {@link
* PipelineTranslatorStreaming}. If we have a batch job, it is instantiated as a {@link
* PipelineTranslatorBatch}.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
private int depth = 0;
private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class);
protected AbstractTranslationContext translationContext;
// --------------------------------------------------------------------------------------------
// Pipeline preparation methods
// --------------------------------------------------------------------------------------------
public static void replaceTransforms(Pipeline pipeline, StreamingOptions options) {
pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(options.isStreaming()));
}
/**
* Visit the pipeline to determine the translation mode (batch/streaming) and update options
* accordingly.
*/
public static void detectTranslationMode(Pipeline pipeline, StreamingOptions options) {
TranslationModeDetector detector = new TranslationModeDetector();
pipeline.traverseTopologically(detector);
if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) {
options.setStreaming(true);
}
}
/** The translation mode of the Beam Pipeline. */
private enum TranslationMode {
/** Uses the batch mode. */
BATCH,
/** Uses the streaming mode. */
STREAMING
}
/** Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. */
private static class TranslationModeDetector extends Pipeline.PipelineVisitor.Defaults {
private static final Logger LOG = LoggerFactory.getLogger(TranslationModeDetector.class);
private TranslationMode translationMode;
TranslationModeDetector(TranslationMode defaultMode) {
this.translationMode = defaultMode;
}
TranslationModeDetector() {
this(TranslationMode.BATCH);
}
TranslationMode getTranslationMode() {
return translationMode;
}
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {
if (translationMode.equals(TranslationMode.BATCH)) {
if (value instanceof PCollection
&& ((PCollection) value).isBounded() == PCollection.IsBounded.UNBOUNDED) {
LOG.info(
"Found unbounded PCollection {}. Switching to streaming execution.", value.getName());
translationMode = TranslationMode.STREAMING;
}
}
}
}
// --------------------------------------------------------------------------------------------
// Pipeline utility methods
// --------------------------------------------------------------------------------------------
/**
* Utility formatting method.
*
* @param n number of spaces to generate
* @return String with "|" followed by n spaces
*/
private static String genSpaces(int n) {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < n; i++) {
builder.append("| ");
}
return builder.toString();
}
/** Get a {@link TransformTranslator} for the given {@link TransformHierarchy.Node}. */
protected abstract TransformTranslator<?> getTransformTranslator(TransformHierarchy.Node node);
/** Apply the given TransformTranslator to the given node. */
private <T extends PTransform<?, ?>> void applyTransformTranslator(
TransformHierarchy.Node node, TransformTranslator<?> transformTranslator) {
// create the applied PTransform on the translationContext
translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
// avoid type capture
@SuppressWarnings("unchecked")
T typedTransform = (T) node.getTransform();
@SuppressWarnings("unchecked")
TransformTranslator<T> typedTransformTranslator = (TransformTranslator<T>) transformTranslator;
// apply the transformTranslator
typedTransformTranslator.translateTransform(typedTransform, translationContext);
}
// --------------------------------------------------------------------------------------------
// Pipeline visitor entry point
// --------------------------------------------------------------------------------------------
/**
* Translates the pipeline by passing this class as a visitor.
*
* @param pipeline The pipeline to be translated
*/
public void translate(Pipeline pipeline) {
LOG.debug("starting translation of the pipeline using {}", getClass().getName());
pipeline.traverseTopologically(this);
}
// --------------------------------------------------------------------------------------------
// Pipeline Visitor Methods
// --------------------------------------------------------------------------------------------
@Override
public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
LOG.debug("{} enterCompositeTransform- {}", genSpaces(depth), node.getFullName());
depth++;
TransformTranslator<?> transformTranslator = getTransformTranslator(node);
if (transformTranslator != null) {
applyTransformTranslator(node, transformTranslator);
LOG.debug("{} translated- {}", genSpaces(depth), node.getFullName());
return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
} else {
return CompositeBehavior.ENTER_TRANSFORM;
}
}
@Override
public void leaveCompositeTransform(TransformHierarchy.Node node) {
depth--;
LOG.debug("{} leaveCompositeTransform- {}", genSpaces(depth), node.getFullName());
}
@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
LOG.debug("{} visitPrimitiveTransform- {}", genSpaces(depth), node.getFullName());
// get the transformation corresponding to the node we are
// currently visiting and translate it into its Spark alternative.
TransformTranslator<?> transformTranslator = getTransformTranslator(node);
if (transformTranslator == null) {
String transformUrn = PTransformTranslation.urnForTransform(node.getTransform());
throw new UnsupportedOperationException(
"The transform " + transformUrn + " is currently not supported.");
}
applyTransformTranslator(node, transformTranslator);
}
public AbstractTranslationContext getTranslationContext() {
return translationContext;
}
}