[BEAM-8470] Add Flatten transformation translator
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 98f77af..3c29867 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -83,6 +83,10 @@
// --------------------------------------------------------------------------------------------
// Datasets methods
// --------------------------------------------------------------------------------------------
+ @SuppressWarnings("unchecked")
+ public <T> Dataset<T> emptyDataset() {
+ return (Dataset<T>) sparkSession.emptyDataset(Encoders.bean(Void.class));
+ }
@SuppressWarnings("unchecked")
public <T> Dataset<WindowedValue<T>> getDataset(PValue value) {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
deleted file mode 100644
index 87a250e..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-
-import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-
-class FlattenPCollectionTranslatorBatch<T>
- implements TransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>> {
-
- @Override
- public void translateTransform(
- PTransform<PCollectionList<T>, PCollection<T>> transform, TranslationContext context) {}
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
new file mode 100644
index 0000000..2739e83
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Map;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.sql.Dataset;
+
+class FlattenTranslatorBatch<T>
+ implements TransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>> {
+
+ @Override
+ public void translateTransform(
+ PTransform<PCollectionList<T>, PCollection<T>> transform, TranslationContext context) {
+ Map<TupleTag<?>, PValue> inputs = context.getInputs();
+ Dataset<WindowedValue<T>> result = null;
+
+ if (inputs.isEmpty()) {
+ result = context.emptyDataset();
+ } else {
+ for (PValue pValue : inputs.values()) {
+ checkArgument(
+ pValue instanceof PCollection,
+ "Got non-PCollection input to flatten: %s of type %s",
+ pValue,
+ pValue.getClass().getSimpleName());
+ @SuppressWarnings("unchecked")
+ PCollection<T> pCollection = (PCollection<T>) pValue;
+ Dataset<WindowedValue<T>> current = context.getDataset(pCollection);
+ if (result == null) {
+ result = current;
+ } else {
+ result = result.union(current);
+ }
+ }
+ }
+ context.putDataset(context.getOutput(), result);
+ }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
index 318d74c..26f1b9c 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
@@ -56,7 +56,7 @@
TRANSFORM_TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch());
TRANSFORM_TRANSLATORS.put(
- PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionTranslatorBatch());
+ PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenTranslatorBatch());
TRANSFORM_TRANSLATORS.put(
PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslatorBatch());