[BEAM-10670] Use non-SDF based translation for Read by default on Spark Runner
diff --git a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
index 67a0e80..0b3880a 100644
--- a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
+++ b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
@@ -173,12 +173,8 @@
   private TranslationContext translatePipeline(Pipeline pipeline) {
     PipelineTranslator.detectTranslationMode(pipeline, options);
 
-    // Default to using the primitive versions of Read.Bounded and Read.Unbounded if we are
-    // executing an unbounded pipeline or the user specifically requested it.
-    if (options.isStreaming()
-        || ExperimentalOptions.hasExperiment(
-            pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
-        || ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read")) {
+    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) {
+      // Default to using the primitive versions of Read.Bounded and Read.Unbounded.
       pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
       SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
     }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 5369409..131dcec 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -161,12 +161,8 @@
     // visit the pipeline to determine the translation mode
     detectTranslationMode(pipeline);
 
-    // Default to using the primitive versions of Read.Bounded and Read.Unbounded if we are
-    // executing an unbounded pipeline or the user specifically requested it.
-    if (pipelineOptions.isStreaming()
-        || ExperimentalOptions.hasExperiment(
-            pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
-        || ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read")) {
+    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) {
+      // Default to using the primitive versions of Read.Bounded and Read.Unbounded.
       pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
       SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
     }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
index 37d9d54..2e52a87 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
@@ -81,15 +81,13 @@
   public SparkPipelineResult run(Pipeline pipeline) {
     boolean isStreaming =
         options.isStreaming() || options.as(TestSparkPipelineOptions.class).isForceStreaming();
-    // Default to using the primitive versions of Read.Bounded and Read.Unbounded if we are
-    // executing an unbounded pipeline or the user specifically requested it.
-    if (isStreaming
-        || ExperimentalOptions.hasExperiment(
-            pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
-        || ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read")) {
+
+    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) {
+      // Default to using the primitive versions of Read.Bounded and Read.Unbounded.
       pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
       SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
     }
+
     JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline");
     JavaStreamingContext jssc =
         new JavaStreamingContext(jsc, new org.apache.spark.streaming.Duration(1000));
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index 6b2782d..c9bb83d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -81,7 +81,7 @@
         .apply(TextIO.write().to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
 
     final String expectedPipeline =
-        "_.<org.apache.beam.sdk.io.Read$Bounded>\n"
+        "sparkContext.<readFrom(org.apache.beam.sdk.transforms.Create$Values$CreateSource)>()\n"
             + "_.mapPartitions("
             + "new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
             + "_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n"