[BEAM-8470] Add Flatten transformation translator
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 98f77af..3c29867 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -83,6 +83,10 @@
   // --------------------------------------------------------------------------------------------
   //  Datasets methods
   // --------------------------------------------------------------------------------------------
+  @SuppressWarnings("unchecked")
+  public <T> Dataset<T> emptyDataset() {
+    return (Dataset<T>) sparkSession.emptyDataset(Encoders.bean(Void.class));
+  }
 
   @SuppressWarnings("unchecked")
   public <T> Dataset<WindowedValue<T>> getDataset(PValue value) {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
deleted file mode 100644
index 87a250e..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-
-import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-
-class FlattenPCollectionTranslatorBatch<T>
-    implements TransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>> {
-
-  @Override
-  public void translateTransform(
-      PTransform<PCollectionList<T>, PCollection<T>> transform, TranslationContext context) {}
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
new file mode 100644
index 0000000..2739e83
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Map;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.sql.Dataset;
+
+class FlattenTranslatorBatch<T>
+    implements TransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>> {
+
+  @Override
+  public void translateTransform(
+      PTransform<PCollectionList<T>, PCollection<T>> transform, TranslationContext context) {
+    Map<TupleTag<?>, PValue> inputs = context.getInputs();
+    Dataset<WindowedValue<T>> result = null;
+
+    if (inputs.isEmpty()) {
+      result = context.emptyDataset();
+    } else {
+      for (PValue pValue : inputs.values()) {
+        checkArgument(
+            pValue instanceof PCollection,
+            "Got non-PCollection input to flatten: %s of type %s",
+            pValue,
+            pValue.getClass().getSimpleName());
+        @SuppressWarnings("unchecked")
+        PCollection<T> pCollection = (PCollection<T>) pValue;
+        Dataset<WindowedValue<T>> current = context.getDataset(pCollection);
+        if (result == null) {
+          result = current;
+        } else {
+          result = result.union(current);
+        }
+      }
+    }
+    context.putDataset(context.getOutput(), result);
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
index 318d74c..26f1b9c 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
@@ -56,7 +56,7 @@
     TRANSFORM_TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch());
 
     TRANSFORM_TRANSLATORS.put(
-        PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionTranslatorBatch());
+        PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenTranslatorBatch());
 
     TRANSFORM_TRANSLATORS.put(
         PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslatorBatch());