Merge pull request #13715: [BEAM-11593] Move SparkStructuredStreamingRunnerRegistrar to its own package
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index 79f2a4e..cfc0737 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -18,8 +18,6 @@
 package org.apache.beam.runners.spark;
 
 import com.google.auto.service.AutoService;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
@@ -41,8 +39,7 @@
   public static class Runner implements PipelineRunnerRegistrar {
     @Override
     public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
-      return ImmutableList.of(
-          SparkRunner.class, TestSparkRunner.class, SparkStructuredStreamingRunner.class);
+      return ImmutableList.of(SparkRunner.class, TestSparkRunner.class);
     }
   }
 
@@ -52,9 +49,7 @@
     @Override
     public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
       return ImmutableList.of(
-          SparkPipelineOptions.class,
-          SparkStructuredStreamingPipelineOptions.class,
-          SparkPortableStreamingPipelineOptions.class);
+          SparkPipelineOptions.class, SparkPortableStreamingPipelineOptions.class);
     }
   }
 }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunnerRegistrar.java
new file mode 100644
index 0000000..fb759da
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunnerRegistrar.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the {@link
+ * SparkStructuredStreamingRunner}.
+ *
+ * <p>{@link AutoService} will register Spark's implementations of the {@link PipelineRunner} and
+ * {@link PipelineOptions} as available pipeline runner services.
+ */
+public final class SparkStructuredStreamingRunnerRegistrar {
+  private SparkStructuredStreamingRunnerRegistrar() {}
+
+  /** Registers the {@link SparkStructuredStreamingRunner}. */
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static class Runner implements PipelineRunnerRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+      return ImmutableList.of(SparkStructuredStreamingRunner.class);
+    }
+  }
+
+  /** Registers the {@link SparkStructuredStreamingPipelineOptions}. */
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static class Options implements PipelineOptionsRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.of(SparkStructuredStreamingPipelineOptions.class);
+    }
+  }
+}
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 652bb8f..2276ef8 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -21,8 +21,6 @@
 import static org.junit.Assert.fail;
 
 import java.util.ServiceLoader;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
 import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -37,18 +35,14 @@
   @Test
   public void testOptions() {
     assertEquals(
-        ImmutableList.of(
-            SparkPipelineOptions.class,
-            SparkStructuredStreamingPipelineOptions.class,
-            SparkPortableStreamingPipelineOptions.class),
+        ImmutableList.of(SparkPipelineOptions.class, SparkPortableStreamingPipelineOptions.class),
         new SparkRunnerRegistrar.Options().getPipelineOptions());
   }
 
   @Test
   public void testRunners() {
     assertEquals(
-        ImmutableList.of(
-            SparkRunner.class, TestSparkRunner.class, SparkStructuredStreamingRunner.class),
+        ImmutableList.of(SparkRunner.class, TestSparkRunner.class),
         new SparkRunnerRegistrar.Runner().getPipelineRunners());
   }
 
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunnerRegistrarTest.java
new file mode 100644
index 0000000..30d8297
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunnerRegistrarTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test {@link SparkStructuredStreamingRunnerRegistrar}. */
+@RunWith(JUnit4.class)
+public class SparkStructuredStreamingRunnerRegistrarTest {
+  @Test
+  public void testOptions() {
+    assertEquals(
+        ImmutableList.of(SparkStructuredStreamingPipelineOptions.class),
+        new SparkStructuredStreamingRunnerRegistrar.Options().getPipelineOptions());
+  }
+
+  @Test
+  public void testRunners() {
+    assertEquals(
+        ImmutableList.of(SparkStructuredStreamingRunner.class),
+        new SparkStructuredStreamingRunnerRegistrar.Runner().getPipelineRunners());
+  }
+
+  @Test
+  public void testServiceLoaderForOptions() {
+    for (PipelineOptionsRegistrar registrar :
+        Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) {
+      if (registrar instanceof SparkStructuredStreamingRunnerRegistrar.Options) {
+        return;
+      }
+    }
+    fail("Expected to find " + SparkStructuredStreamingRunnerRegistrar.Options.class);
+  }
+
+  @Test
+  public void testServiceLoaderForRunner() {
+    for (PipelineRunnerRegistrar registrar :
+        Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) {
+      if (registrar instanceof SparkStructuredStreamingRunnerRegistrar.Runner) {
+        return;
+      }
+    }
+    fail("Expected to find " + SparkStructuredStreamingRunnerRegistrar.Runner.class);
+  }
+}