[Scala Streamlet API] Add Integration Test for Source and Union Operations (#3121)

* [Scala Streamlet API] Add New Integration Test

* Documentation is added
diff --git a/heron/api/src/java/org/apache/heron/streamlet/SerializableBiFunction.java b/heron/api/src/java/org/apache/heron/streamlet/SerializableBiFunction.java
index a681221..b2fee80 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/SerializableBiFunction.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/SerializableBiFunction.java
@@ -25,7 +25,7 @@
 
 /**
  * All user supplied transformation functions have to be serializable.
- * Thus all Strealmet transformation definitions take Serializable
+ * Thus all Streamlet transformation definitions take Serializable
  * Functions as their input. We simply decorate java.util. function
  * definitions with a Serializable tag to ensure that any supplied
  * lambda functions automatically become serializable.
diff --git a/heron/api/src/java/org/apache/heron/streamlet/SerializableBinaryOperator.java b/heron/api/src/java/org/apache/heron/streamlet/SerializableBinaryOperator.java
index 4441d49..c8586f5 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/SerializableBinaryOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/SerializableBinaryOperator.java
@@ -22,7 +22,7 @@
 
 /**
  * All user supplied transformation functions have to be serializable.
- * Thus all Strealmet transformation definitions take Serializable
+ * Thus all Streamlet transformation definitions take Serializable
  * Functions as their input. We simply decorate java.util. function
  * definitions with a Serializable tag to ensure that any supplied
  * lambda functions automatically become serializable.
diff --git a/integration_test/src/python/test_runner/resources/test.json b/integration_test/src/python/test_runner/resources/test.json
index 62cc92f..bb15dce 100644
--- a/integration_test/src/python/test_runner/resources/test.json
+++ b/integration_test/src/python/test_runner/resources/test.json
@@ -22,6 +22,11 @@
       "topologyName": "IntegrationTest_ScalaStreamletWithSplitAndWithStream",
       "classPath": "scala_streamlet_with_split_and_with_stream.ScalaStreamletWithSplitAndWithStream",
       "expectedResultRelativePath": "scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStreamResults.json"
+    },
+    {
+      "topologyName" : "IntegrationTest_ScalaStreamletWithMapAndFilterAndUnion",
+      "classPath"    : "scala_streamlet_with_map_and_filter_and_union.ScalaStreamletWithMapAndFilterAndUnion",
+      "expectedResultRelativePath" : "scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnionResults.json"
     }
   ],
   "javaTopologies": [
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/common/ClassicalMusicDataset.scala b/integration_test/src/scala/org/apache/heron/integration_test/common/ClassicalMusicDataset.scala
new file mode 100644
index 0000000..29388cc
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/common/ClassicalMusicDataset.scala
@@ -0,0 +1,52 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+package org.apache.heron.integration_test.common
+
+case class ClassicalMusic(composer: String,
+                          title: String,
+                          year: Int,
+                          keyword: String)
+
+/**
+  * Common Dataset to be used by Scala Streamlet Integration Tests
+  */
+object ClassicalMusicDataset {
+
+  val firstClassicalMusicList = List(
+    ClassicalMusic("Bach", "Bourrée In E Minor", 1717, "guitar"),
+    ClassicalMusic("Vivaldi", "Four Seasons: Winter", 1723, "rousing"),
+    ClassicalMusic("Bach", "Air On The G String", 1723, "light"),
+    ClassicalMusic("Mozart", "Symphony No. 40: I", 1788, "seductive"),
+    ClassicalMusic("Beethoven", "Symphony No. 9: Ode To Joy", 1824, "joyful"),
+    ClassicalMusic("Bizet", "Carmen: Habanera", 1875, "seductive")
+  )
+
+  val secondClassicalMusicList = List(
+    ClassicalMusic("Handel", "Water Music: Alla Hornpipe", 1717, "formal"),
+    ClassicalMusic("Vivaldi", "Four Seasons: Spring", 1723, "formal"),
+    ClassicalMusic("Bach",
+                   "Cantata 147: Jesu, Joy Of Man's Desiring",
+                   1723,
+                   "wedding"),
+    ClassicalMusic("Mozart", "Piano Sonata No. 16", 1788, "piano"),
+    ClassicalMusic("Beethoven", "Symphony No. 9: II", 1824, "powerful"),
+    ClassicalMusic("Tchaikovsky", "Piano Concerto No. 1", 1875, "piano")
+  )
+
+}
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala b/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
index d68180b..0b9f85a 100644
--- a/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
+++ b/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
@@ -1,21 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
 package org.apache.heron.integration_test.common
 
 import org.apache.heron.integration_test.core.TestTopologyBuilder
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
index 3e73f2a..b2ccbae 100644
--- a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
@@ -1,21 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
 package org.apache.heron.integration_test.topology.scala_streamlet_with_filter_and_transform
 
 import java.util.concurrent.atomic.AtomicInteger
@@ -28,9 +28,7 @@
   AbstractTestTopology,
   ScalaIntegrationTestBase
 }
-import org.apache.heron.streamlet.scala.{
-  Builder, SerializableTransformer
-}
+import org.apache.heron.streamlet.scala.{Builder, SerializableTransformer}
 
 object ScalaStreamletWithFilterAndTransform {
   def main(args: Array[String]): Unit = {
@@ -41,7 +39,7 @@
 }
 
 /**
-  * Scala Streamlet Integration Test
+  * Scala Streamlet Integration Test by covering source, filter, transform operations.
   */
 class ScalaStreamletWithFilterAndTransform(args: Array[String])
     extends AbstractTestTopology(args)
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnion.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnion.scala
new file mode 100644
index 0000000..9cfe6a1
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnion.scala
@@ -0,0 +1,102 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+package org.apache.heron.integration_test.topology.scala_streamlet_with_map_and_filter_and_union
+
+import org.apache.heron.api.Config
+import org.apache.heron.integration_test.common.{
+  AbstractTestTopology,
+  ClassicalMusic,
+  ScalaIntegrationTestBase
+}
+import org.apache.heron.integration_test.common.ClassicalMusicDataset._
+import org.apache.heron.integration_test.core.TestTopologyBuilder
+import org.apache.heron.streamlet.Context
+import org.apache.heron.streamlet.scala.{Builder, Source}
+
+import scala.collection.mutable.Set
+
+object ScalaStreamletWithMapAndFilterAndUnion {
+
+  val filterSet = Set[String]()
+
+  def main(args: Array[String]): Unit = {
+    val conf = new Config
+    val topology = new ScalaStreamletWithMapAndFilterAndUnion(args)
+    topology.submit(conf)
+  }
+
+}
+
+/**
+  * Scala Streamlet Integration Test by covering source, map, filter and union operations.
+  */
+class ScalaStreamletWithMapAndFilterAndUnion(args: Array[String])
+    extends AbstractTestTopology(args)
+    with ScalaIntegrationTestBase {
+
+  import ScalaStreamletWithMapAndFilterAndUnion._
+
+  override protected def buildTopology(
+      testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = {
+    val streamletBuilder = Builder.newBuilder
+    val classicalMusics1 =
+      streamletBuilder
+        .newSource(new ClassicalMusicSource(firstClassicalMusicList))
+        .setName("classical-musics")
+        .map(
+          classicalMusic =>
+            new ClassicalMusic(classicalMusic.composer.toUpperCase(),
+                               classicalMusic.title.toUpperCase(),
+                               classicalMusic.year,
+                               classicalMusic.keyword.toUpperCase()))
+        .setName("classical-musics-with-uppercase")
+
+    val classicalMusics2 = streamletBuilder
+      .newSource(new ClassicalMusicSource(secondClassicalMusicList))
+      .setName("classical-musics-2")
+
+    val unionStreamlet = classicalMusics1
+      .union(classicalMusics2)
+      .setName("classical-musics-union")
+
+    unionStreamlet
+      .map[String](classicalMusic =>
+        s"${classicalMusic.composer}-${classicalMusic.year}")
+      .setName("classical-musics-with-composer-and-year")
+      .filter(filterSet.add(_))
+      .setName("filtered-classical-musics")
+
+    build(testTopologyBuilder, streamletBuilder)
+  }
+
+}
+
+private class ClassicalMusicSource(classicalMusics: List[ClassicalMusic])
+    extends Source[ClassicalMusic] {
+
+  var list = List[ClassicalMusic]()
+
+  override def setup(context: Context): Unit = {
+    list = classicalMusics
+  }
+
+  override def get(): Iterable[ClassicalMusic] = list
+
+  override def cleanup(): Unit = ???
+}
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnionResults.json b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnionResults.json
new file mode 100644
index 0000000..0eab457
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnionResults.json
@@ -0,0 +1 @@
+["BACH-1717", "BACH-1723", "BEETHOVEN-1824", "BIZET-1875", "Bach-1723", "Beethoven-1824", "Handel-1717", "MOZART-1788", "Mozart-1788", "Tchaikovsky-1875", "VIVALDI-1723", "Vivaldi-1723"]
\ No newline at end of file
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala
index bb77987..884ba1f 100644
--- a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala
@@ -1,21 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
 package org.apache.heron.integration_test.topology.scala_streamlet_with_map_and_flatmap_and_filter_and_clone
 
 import scala.collection.mutable.Set
@@ -44,6 +44,9 @@
   }
 }
 
+/**
+  * Scala Streamlet Integration Test by covering source, map, flatMap, filter and clone operations.
+  */
 class ScalaStreamletWithMapAndFlatMapAndFilterAndClone(args: Array[String])
     extends AbstractTestTopology(args)
     with ScalaIntegrationTestBase {
@@ -52,6 +55,7 @@
 
   override protected def buildTopology(
       testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = {
+
     val streamletBuilder = Builder.newBuilder
 
     val clonedStreamlet = streamletBuilder