blob: 2e52a87ebc7ca930cd10413d122c86d0707495e3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Pipeline runner which translates a Beam pipeline into equivalent Spark operations, without
* running them. Used for debugging purposes.
*
* <p>Example:
*
* <pre>{@code
* SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
* options.setRunner(SparkRunnerDebugger.class);
* Pipeline pipeline = Pipeline.create(options);
* SparkRunnerDebugger.DebugSparkPipelineResult result =
* (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run();
* String sparkPipeline = result.getDebugString();
* }</pre>
*/
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public final class SparkRunnerDebugger extends PipelineRunner<SparkPipelineResult> {
private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerDebugger.class);
private final SparkPipelineOptions options;
private SparkRunnerDebugger(SparkPipelineOptions options) {
this.options = options;
}
public static SparkRunnerDebugger fromOptions(PipelineOptions options) {
if (options instanceof TestSparkPipelineOptions) {
TestSparkPipelineOptions testSparkPipelineOptions =
PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, options);
return new SparkRunnerDebugger(testSparkPipelineOptions);
} else {
SparkPipelineOptions sparkPipelineOptions =
PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
return new SparkRunnerDebugger(sparkPipelineOptions);
}
}
@Override
public SparkPipelineResult run(Pipeline pipeline) {
boolean isStreaming =
options.isStreaming() || options.as(TestSparkPipelineOptions.class).isForceStreaming();
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) {
// Default to using the primitive versions of Read.Bounded and Read.Unbounded.
pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
}
JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline");
JavaStreamingContext jssc =
new JavaStreamingContext(jsc, new org.apache.spark.streaming.Duration(1000));
SparkRunner.initAccumulators(options, jsc);
TransformTranslator.Translator translator = new TransformTranslator.Translator();
SparkNativePipelineVisitor visitor;
if (isStreaming) {
SparkPipelineTranslator streamingTranslator =
new StreamingTransformTranslator.Translator(translator);
EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc);
visitor = new SparkNativePipelineVisitor(streamingTranslator, ctxt);
} else {
EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc);
visitor = new SparkNativePipelineVisitor(translator, ctxt);
}
pipeline.traverseTopologically(visitor);
jsc.stop();
String debugString = visitor.getDebugString();
LOG.info("Translated Native Spark pipeline:\n" + debugString);
return new DebugSparkPipelineResult(debugString);
}
/**
* PipelineResult of running a {@link Pipeline} using {@link SparkRunnerDebugger} Use {@link
* #getDebugString} to get a {@link String} representation of the {@link Pipeline} translated into
* Spark native operations.
*/
public static class DebugSparkPipelineResult extends SparkPipelineResult {
private final String debugString;
DebugSparkPipelineResult(String debugString) {
super(null, null);
this.debugString = debugString;
}
/** Returns Beam pipeline translated into Spark native operations. */
String getDebugString() {
return debugString;
}
@Override
protected void stop() {
// Empty implementation
}
@Override
protected State awaitTermination(Duration duration)
throws TimeoutException, ExecutionException, InterruptedException {
return State.DONE;
}
}
}