[BEAM-10670] Use non-SDF based translation for Read by default on Spark Runner
diff --git a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
index 67a0e80..0b3880a 100644
--- a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
+++ b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
@@ -173,12 +173,8 @@
private TranslationContext translatePipeline(Pipeline pipeline) {
PipelineTranslator.detectTranslationMode(pipeline, options);
- // Default to using the primitive versions of Read.Bounded and Read.Unbounded if we are
- // executing an unbounded pipeline or the user specifically requested it.
- if (options.isStreaming()
- || ExperimentalOptions.hasExperiment(
- pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
- || ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read")) {
+ if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) {
+ // Default to using the primitive versions of Read.Bounded and Read.Unbounded.
pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 5369409..131dcec 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -161,12 +161,8 @@
// visit the pipeline to determine the translation mode
detectTranslationMode(pipeline);
- // Default to using the primitive versions of Read.Bounded and Read.Unbounded if we are
- // executing an unbounded pipeline or the user specifically requested it.
- if (pipelineOptions.isStreaming()
- || ExperimentalOptions.hasExperiment(
- pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
- || ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read")) {
+ if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) {
+ // Default to using the primitive versions of Read.Bounded and Read.Unbounded.
pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
index 37d9d54..2e52a87 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
@@ -81,15 +81,13 @@
public SparkPipelineResult run(Pipeline pipeline) {
boolean isStreaming =
options.isStreaming() || options.as(TestSparkPipelineOptions.class).isForceStreaming();
- // Default to using the primitive versions of Read.Bounded and Read.Unbounded if we are
- // executing an unbounded pipeline or the user specifically requested it.
- if (isStreaming
- || ExperimentalOptions.hasExperiment(
- pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
- || ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read")) {
+
+ if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) {
+ // Default to using the primitive versions of Read.Bounded and Read.Unbounded.
pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
}
+
JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline");
JavaStreamingContext jssc =
new JavaStreamingContext(jsc, new org.apache.spark.streaming.Duration(1000));
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index 6b2782d..c9bb83d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -81,7 +81,7 @@
.apply(TextIO.write().to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
final String expectedPipeline =
- "_.<org.apache.beam.sdk.io.Read$Bounded>\n"
+ "sparkContext.<readFrom(org.apache.beam.sdk.transforms.Create$Values$CreateSource)>()\n"
+ "_.mapPartitions("
+ "new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n"