[Scala Streamlet API] Add Integration Test for Source and Union Operations (#3121)
* [Scala Streamlet API] Add New Integration Test
* Documentation is added
diff --git a/heron/api/src/java/org/apache/heron/streamlet/SerializableBiFunction.java b/heron/api/src/java/org/apache/heron/streamlet/SerializableBiFunction.java
index a681221..b2fee80 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/SerializableBiFunction.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/SerializableBiFunction.java
@@ -25,7 +25,7 @@
/**
* All user supplied transformation functions have to be serializable.
- * Thus all Strealmet transformation definitions take Serializable
+ * Thus all Streamlet transformation definitions take Serializable
* Functions as their input. We simply decorate java.util. function
* definitions with a Serializable tag to ensure that any supplied
* lambda functions automatically become serializable.
diff --git a/heron/api/src/java/org/apache/heron/streamlet/SerializableBinaryOperator.java b/heron/api/src/java/org/apache/heron/streamlet/SerializableBinaryOperator.java
index 4441d49..c8586f5 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/SerializableBinaryOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/SerializableBinaryOperator.java
@@ -22,7 +22,7 @@
/**
* All user supplied transformation functions have to be serializable.
- * Thus all Strealmet transformation definitions take Serializable
+ * Thus all Streamlet transformation definitions take Serializable
* Functions as their input. We simply decorate java.util. function
* definitions with a Serializable tag to ensure that any supplied
* lambda functions automatically become serializable.
diff --git a/integration_test/src/python/test_runner/resources/test.json b/integration_test/src/python/test_runner/resources/test.json
index 62cc92f..bb15dce 100644
--- a/integration_test/src/python/test_runner/resources/test.json
+++ b/integration_test/src/python/test_runner/resources/test.json
@@ -22,6 +22,11 @@
"topologyName": "IntegrationTest_ScalaStreamletWithSplitAndWithStream",
"classPath": "scala_streamlet_with_split_and_with_stream.ScalaStreamletWithSplitAndWithStream",
"expectedResultRelativePath": "scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStreamResults.json"
+ },
+ {
+ "topologyName" : "IntegrationTest_ScalaStreamletWithMapAndFilterAndUnion",
+ "classPath" : "scala_streamlet_with_map_and_filter_and_union.ScalaStreamletWithMapAndFilterAndUnion",
+ "expectedResultRelativePath" : "scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnionResults.json"
}
],
"javaTopologies": [
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/common/ClassicalMusicDataset.scala b/integration_test/src/scala/org/apache/heron/integration_test/common/ClassicalMusicDataset.scala
new file mode 100644
index 0000000..29388cc
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/common/ClassicalMusicDataset.scala
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_test.common
+
+case class ClassicalMusic(composer: String,
+ title: String,
+ year: Int,
+ keyword: String)
+
+/**
+ * Common Dataset to be used by Scala Streamlet Integration Tests
+ */
+object ClassicalMusicDataset {
+
+ val firstClassicalMusicList = List(
+ ClassicalMusic("Bach", "Bourrée In E Minor", 1717, "guitar"),
+ ClassicalMusic("Vivaldi", "Four Seasons: Winter", 1723, "rousing"),
+ ClassicalMusic("Bach", "Air On The G String", 1723, "light"),
+ ClassicalMusic("Mozart", "Symphony No. 40: I", 1788, "seductive"),
+ ClassicalMusic("Beethoven", "Symphony No. 9: Ode To Joy", 1824, "joyful"),
+ ClassicalMusic("Bizet", "Carmen: Habanera", 1875, "seductive")
+ )
+
+ val secondClassicalMusicList = List(
+ ClassicalMusic("Handel", "Water Music: Alla Hornpipe", 1717, "formal"),
+ ClassicalMusic("Vivaldi", "Four Seasons: Spring", 1723, "formal"),
+ ClassicalMusic("Bach",
+ "Cantata 147: Jesu, Joy Of Man's Desiring",
+ 1723,
+ "wedding"),
+ ClassicalMusic("Mozart", "Piano Sonata No. 16", 1788, "piano"),
+ ClassicalMusic("Beethoven", "Symphony No. 9: II", 1824, "powerful"),
+ ClassicalMusic("Tchaikovsky", "Piano Concerto No. 1", 1875, "piano")
+ )
+
+}
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala b/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
index d68180b..0b9f85a 100644
--- a/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
+++ b/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
@@ -1,21 +1,21 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.heron.integration_test.common
import org.apache.heron.integration_test.core.TestTopologyBuilder
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
index 3e73f2a..b2ccbae 100644
--- a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
@@ -1,21 +1,21 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.heron.integration_test.topology.scala_streamlet_with_filter_and_transform
import java.util.concurrent.atomic.AtomicInteger
@@ -28,9 +28,7 @@
AbstractTestTopology,
ScalaIntegrationTestBase
}
-import org.apache.heron.streamlet.scala.{
- Builder, SerializableTransformer
-}
+import org.apache.heron.streamlet.scala.{Builder, SerializableTransformer}
object ScalaStreamletWithFilterAndTransform {
def main(args: Array[String]): Unit = {
@@ -41,7 +39,7 @@
}
/**
- * Scala Streamlet Integration Test
+ * Scala Streamlet Integration Test by covering source, filter, transform operations.
*/
class ScalaStreamletWithFilterAndTransform(args: Array[String])
extends AbstractTestTopology(args)
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnion.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnion.scala
new file mode 100644
index 0000000..9cfe6a1
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnion.scala
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_test.topology.scala_streamlet_with_map_and_filter_and_union
+
+import org.apache.heron.api.Config
+import org.apache.heron.integration_test.common.{
+ AbstractTestTopology,
+ ClassicalMusic,
+ ScalaIntegrationTestBase
+}
+import org.apache.heron.integration_test.common.ClassicalMusicDataset._
+import org.apache.heron.integration_test.core.TestTopologyBuilder
+import org.apache.heron.streamlet.Context
+import org.apache.heron.streamlet.scala.{Builder, Source}
+
+import scala.collection.mutable.Set
+
+object ScalaStreamletWithMapAndFilterAndUnion {
+
+ val filterSet = Set[String]()
+
+ def main(args: Array[String]): Unit = {
+ val conf = new Config
+ val topology = new ScalaStreamletWithMapAndFilterAndUnion(args)
+ topology.submit(conf)
+ }
+
+}
+
+/**
+ * Scala Streamlet Integration Test by covering source, map, filter and union operations.
+ */
+class ScalaStreamletWithMapAndFilterAndUnion(args: Array[String])
+ extends AbstractTestTopology(args)
+ with ScalaIntegrationTestBase {
+
+ import ScalaStreamletWithMapAndFilterAndUnion._
+
+ override protected def buildTopology(
+ testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = {
+ val streamletBuilder = Builder.newBuilder
+ val classicalMusics1 =
+ streamletBuilder
+ .newSource(new ClassicalMusicSource(firstClassicalMusicList))
+ .setName("classical-musics")
+ .map(
+ classicalMusic =>
+ new ClassicalMusic(classicalMusic.composer.toUpperCase(),
+ classicalMusic.title.toUpperCase(),
+ classicalMusic.year,
+ classicalMusic.keyword.toUpperCase()))
+ .setName("classical-musics-with-uppercase")
+
+ val classicalMusics2 = streamletBuilder
+ .newSource(new ClassicalMusicSource(secondClassicalMusicList))
+ .setName("classical-musics-2")
+
+ val unionStreamlet = classicalMusics1
+ .union(classicalMusics2)
+ .setName("classical-musics-union")
+
+ unionStreamlet
+ .map[String](classicalMusic =>
+ s"${classicalMusic.composer}-${classicalMusic.year}")
+ .setName("classical-musics-with-composer-and-year")
+ .filter(filterSet.add(_))
+ .setName("filtered-classical-musics")
+
+ build(testTopologyBuilder, streamletBuilder)
+ }
+
+}
+
+private class ClassicalMusicSource(classicalMusics: List[ClassicalMusic])
+ extends Source[ClassicalMusic] {
+
+ var list = List[ClassicalMusic]()
+
+ override def setup(context: Context): Unit = {
+ list = classicalMusics
+ }
+
+ override def get(): Iterable[ClassicalMusic] = list
+
+ override def cleanup(): Unit = ???
+}
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnionResults.json b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnionResults.json
new file mode 100644
index 0000000..0eab457
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnionResults.json
@@ -0,0 +1 @@
+["BACH-1717", "BACH-1723", "BEETHOVEN-1824", "BIZET-1875", "Bach-1723", "Beethoven-1824", "Handel-1717", "MOZART-1788", "Mozart-1788", "Tchaikovsky-1875", "VIVALDI-1723", "Vivaldi-1723"]
\ No newline at end of file
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala
index bb77987..884ba1f 100644
--- a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala
@@ -1,21 +1,21 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.heron.integration_test.topology.scala_streamlet_with_map_and_flatmap_and_filter_and_clone
import scala.collection.mutable.Set
@@ -44,6 +44,9 @@
}
}
+/**
+ * Scala Streamlet Integration Test by covering source, map, flatMap, filter and clone operations.
+ */
class ScalaStreamletWithMapAndFlatMapAndFilterAndClone(args: Array[String])
extends AbstractTestTopology(args)
with ScalaIntegrationTestBase {
@@ -52,6 +55,7 @@
override protected def buildTopology(
testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = {
+
val streamletBuilder = Builder.newBuilder
val clonedStreamlet = streamletBuilder