[Streamlet Scala API] Add Scala Streamlet Integration Tests Part I (#2826)

* Scala Streamlet Integration Tests Part I

* Minor change is applied to TextTransformer

* Fix Scala Streamlet Integration Test Packaging Problem

* Fix Scala Streamlet Integration Test Result Problem

* Add Fix for Travis

* Add Fix for Travis
diff --git a/.gitignore b/.gitignore
index 927928b..12fbf25 100644
--- a/.gitignore
+++ b/.gitignore
@@ -132,3 +132,6 @@
 
 # Visual Studio Code
 .vscode
+
+# integration_test
+results/
\ No newline at end of file
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java b/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java
index e00eb14..b982453 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java
@@ -11,10 +11,8 @@
 //  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 //  See the License for the specific language governing permissions and
 //  limitations under the License.
-
 package com.twitter.heron.streamlet.impl;
 
-
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -60,6 +58,10 @@
    */
   public TopologyBuilder build() {
     TopologyBuilder builder = new TopologyBuilder();
+    return build(builder);
+  }
+
+  public TopologyBuilder build(TopologyBuilder builder) {
     Set<String> stageNames = new HashSet<>();
     for (StreamletImpl<?> streamlet : sources) {
       streamlet.build(builder, stageNames);
diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala b/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala
index 7867448..7bbbdd2 100644
--- a/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala
+++ b/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala
@@ -13,8 +13,6 @@
 //  limitations under the License.
 package com.twitter.heron.streamlet.scala
 
-import java.io.Serializable
-
 import com.twitter.heron.streamlet.Context
 
 /**
diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala b/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala
index d6cd4c8..4458d70 100644
--- a/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala
+++ b/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala
@@ -37,4 +37,7 @@
   def build(): TopologyBuilder =
     builder.asInstanceOf[JavaBuilderImpl].build()
 
+  def build(topologyBuilder: TopologyBuilder): TopologyBuilder =
+    builder.asInstanceOf[JavaBuilderImpl].build(topologyBuilder)
+
 }
diff --git a/integration_test/src/python/test_runner/main.py b/integration_test/src/python/test_runner/main.py
index 861a951..f5d4d1c 100644
--- a/integration_test/src/python/test_runner/main.py
+++ b/integration_test/src/python/test_runner/main.py
@@ -293,11 +293,15 @@
 
   http_server_host_port = "%s:%d" % (args.http_server_hostname, args.http_server_port)
 
-  if args.tests_bin_path.endswith(".jar"):
+  if args.tests_bin_path.endswith("scala-integration-tests.jar"):
+    test_topologies = filter_test_topologies(conf["scalaTopologies"], args.test_topology_pattern)
+    topology_classpath_prefix = conf["topologyClasspathPrefix"]
+    extra_topology_args = "-s http://%s/state" % http_server_host_port
+  elif args.tests_bin_path.endswith("integration-tests.jar"):
     test_topologies = filter_test_topologies(conf["javaTopologies"], args.test_topology_pattern)
     topology_classpath_prefix = conf["topologyClasspathPrefix"]
     extra_topology_args = "-s http://%s/state" % http_server_host_port
-  elif args.tests_bin_path.endswith(".pex"):
+  elif args.tests_bin_path.endswith("heron_integ_topology.pex"):
     test_topologies = filter_test_topologies(conf["pythonTopologies"], args.test_topology_pattern)
     topology_classpath_prefix = ""
     extra_topology_args = ""
diff --git a/integration_test/src/python/test_runner/resources/test.json b/integration_test/src/python/test_runner/resources/test.json
index 049c58c..ba0addd 100644
--- a/integration_test/src/python/test_runner/resources/test.json
+++ b/integration_test/src/python/test_runner/resources/test.json
@@ -7,6 +7,13 @@
   "cliConfigPath" : "$HOME/.heron/conf",
   "topologyClasspathPrefix" : "com.twitter.heron.integration_test.topology.",
   "releasePackageUri" : "scheme://role/name/version",
+  "scalaTopologies": [
+    {
+      "topologyName" : "IntegrationTest_ScalaStreamletWithFilterAndTransform",
+      "classPath"    : "scala_streamlet_with_filter_and_transform.ScalaStreamletWithFilterAndTransform",
+      "expectedResultRelativePath" : "scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json"
+    }
+  ],
   "javaTopologies": [
     {
       "topologyName" : "IntegrationTest_FieldsGrouping",
diff --git a/integration_test/src/scala/BUILD b/integration_test/src/scala/BUILD
new file mode 100644
index 0000000..48b7d81
--- /dev/null
+++ b/integration_test/src/scala/BUILD
@@ -0,0 +1,28 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+filegroup(
+    name = "test-data-files",
+    srcs = glob(["**/*.json"]),
+)
+
+scala_binary(
+    name = "scala-integration-tests-unshaded",
+    srcs = glob(["com/twitter/heron/integration_test/**/*.scala"]),
+    deps = [
+        "//heron/api/src/java:api-java",
+        "//heron/api/src/scala:api-scala",
+        "//integration_test/src/java:common",
+        "//integration_test/src/java:core",
+        "//heron/api/src/java:api-java-low-level"
+    ],
+    main_class = "com.twitter.heron.integration_test.topology.scala_streamlet_with_filter_and_transform.ScalaStreamletWithFilterAndTransform"
+)
+
+genrule(
+    name = 'scala-integration-tests',
+    srcs = [":scala-integration-tests-unshaded_deploy.jar"],
+    outs = ["scala-integration-tests.jar"],
+    cmd  = "cp $< $@"
+)
\ No newline at end of file
diff --git a/integration_test/src/scala/com/twitter/heron/integration_test/common/ScalaIntegrationTestBase.scala b/integration_test/src/scala/com/twitter/heron/integration_test/common/ScalaIntegrationTestBase.scala
new file mode 100644
index 0000000..9563a1f
--- /dev/null
+++ b/integration_test/src/scala/com/twitter/heron/integration_test/common/ScalaIntegrationTestBase.scala
@@ -0,0 +1,32 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.integration_test.common
+
+import com.twitter.heron.integration_test.core.TestTopologyBuilder
+import com.twitter.heron.streamlet.scala.Builder
+import com.twitter.heron.streamlet.scala.impl.BuilderImpl
+
+/**
+  * Scala Integration Test Base
+  */
+trait ScalaIntegrationTestBase extends Serializable {
+
+  protected def build(testTopologyBuilder: TestTopologyBuilder,
+                      streamletBuilder: Builder): TestTopologyBuilder = {
+    val streamletBuilderImpl = streamletBuilder.asInstanceOf[BuilderImpl]
+    val topologyBuilder = streamletBuilderImpl.build(testTopologyBuilder)
+    topologyBuilder.asInstanceOf[TestTopologyBuilder]
+  }
+
+}
diff --git a/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
new file mode 100644
index 0000000..3408032
--- /dev/null
+++ b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
@@ -0,0 +1,71 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.integration_test.topology.scala_streamlet_with_filter_and_transform
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import com.twitter.heron.api.Config
+import com.twitter.heron.integration_test.common.{
+  AbstractTestTopology,
+  ScalaIntegrationTestBase
+}
+import com.twitter.heron.integration_test.core.TestTopologyBuilder
+import com.twitter.heron.streamlet.Context
+import com.twitter.heron.streamlet.scala.{Builder, SerializableTransformer}
+
+object ScalaStreamletWithFilterAndTransform {
+  def main(args: Array[String]): Unit = {
+    val conf = new Config
+    val topology = new ScalaStreamletWithFilterAndTransform(args)
+    topology.submit(conf)
+  }
+}
+
+/**
+  * Scala Streamlet Integration Test
+  */
+@SerialVersionUID(-7280407024398984674L)
+class ScalaStreamletWithFilterAndTransform(args: Array[String])
+    extends AbstractTestTopology(args)
+    with ScalaIntegrationTestBase {
+
+  override protected def buildTopology(
+      testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = {
+    val atomicInteger = new AtomicInteger
+
+    val streamletBuilder = Builder.newBuilder
+
+    streamletBuilder
+      .newSource(() => atomicInteger.getAndIncrement())
+      .setName("incremented-numbers")
+      .filter((i: Int) => i <= 7)
+      .setName("positive-numbers-lower-than-8")
+      .transform[String](new TextTransformer())
+      .setName("numbers-transformed-to-text")
+
+    build(testTopologyBuilder, streamletBuilder)
+  }
+
+}
+
+private class TextTransformer extends SerializableTransformer[Int, String] {
+  private val alphabet = List("a", "b", "c", "d", "e", "f", "g", "h")
+
+  override def setup(context: Context): Unit = {}
+
+  override def transform(i: Int, fun: String => Unit): Unit =
+    fun(s"${alphabet(i)}-$i".toUpperCase)
+
+  override def cleanup(): Unit = {}
+}
diff --git a/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json
new file mode 100644
index 0000000..dda7256
--- /dev/null
+++ b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json
@@ -0,0 +1 @@
+["A-0", "B-1", "C-2", "D-3", "E-4", "F-5", "G-6", "H-7"]
\ No newline at end of file
diff --git a/scripts/applatix/javatests.sh b/scripts/applatix/javatests.sh
index 0744aaa..e19d5df 100755
--- a/scripts/applatix/javatests.sh
+++ b/scripts/applatix/javatests.sh
@@ -9,14 +9,29 @@
 
 # integration test binaries have to be specified as absolute path
 JAVA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/integration-tests.jar"
+SCALA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/scala-integration-tests.jar"
 
-# run the java integration test
-T="heron integration_test java"
+# initialize http-server for integration tests
+T="heron integration_test http-server initialization"
 start_timer "$T"
 ${HOME}/bin/http-server 8080 &
 http_server_id=$!
 trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
+end_timer "$T"
 
+# run the scala integration test
+T="heron integration_test scala"
+start_timer "$T"
+${HOME}/bin/test-runner \
+  -hc heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \
+  -rh localhost -rp 8080\
+  -tp ${HOME}/.herontests/data/scala \
+  -cl local -rl heron-staging -ev devel
+end_timer "$T"
+
+# run the java integration test
+T="heron integration_test java"
+start_timer "$T"
 ${HOME}/bin/test-runner \
   -hc heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \
   -rh localhost -rp 8080\
diff --git a/scripts/applatix/test.sh b/scripts/applatix/test.sh
index c346dea..8f7391d 100755
--- a/scripts/applatix/test.sh
+++ b/scripts/applatix/test.sh
@@ -18,6 +18,7 @@
 # integration test binaries have to be specified as absolute path
 JAVA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/integration-tests.jar"
 PYTHON_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/heron_integ_topology.pex"
+SCALA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/scala-integration-tests.jar"
 
 # install client
 T="heron client install"
@@ -37,13 +38,27 @@
 python ${UTILS}/save-logs.py "heron_tests_install.txt" ./heron-tests-install.sh --user
 end_timer "$T"
 
-# run the java integration test
-T="heron integration_test java"
+# initialize http-server for integration tests
+T="heron integration_test http-server initialization"
 start_timer "$T"
 ${HOME}/bin/http-server 8080 &
 http_server_id=$!
 trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
+end_timer "$T"
 
+# run the scala integration test
+T="heron integration_test scala"
+start_timer "$T"
+${HOME}/bin/test-runner \
+  -hc heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \
+  -rh localhost -rp 8080\
+  -tp ${HOME}/.herontests/data/scala \
+  -cl local -rl heron-staging -ev devel
+end_timer "$T"
+
+# run the java integration test
+T="heron integration_test java"
+start_timer "$T"
 ${HOME}/bin/test-runner \
   -hc heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \
   -rh localhost -rp 8080\
diff --git a/scripts/packages/BUILD b/scripts/packages/BUILD
index facfb2b..eada6e2 100644
--- a/scripts/packages/BUILD
+++ b/scripts/packages/BUILD
@@ -344,6 +344,15 @@
 )
 
 pkg_tar(
+    name = "heron-tests-data-scala",
+    package_dir = "data/scala",
+    srcs = [
+        "//integration_test/src/scala:test-data-files",
+    ],
+    strip_prefix = '/integration_test/src/scala/com/twitter/heron/integration_test/topology/'
+)
+
+pkg_tar(
     name = "heron-tests-data-java",
     package_dir = "data/java",
     srcs = [
@@ -365,6 +374,7 @@
     name = "heron-tests-lib",
     package_dir = "lib",
     srcs = [
+       "//integration_test/src/scala:scala-integration-tests",
        "//integration_test/src/java:integration-tests",
        "//integration_test/src/python/integration_test/topology:heron_integ_topology",
     ],
@@ -376,6 +386,7 @@
     srcs = generated_release_files,
     deps = [
         ":heron-tests-bin",
+        ":heron-tests-data-scala",
         ":heron-tests-data-java",
         ":heron-tests-data-python",
         ":heron-tests-lib",
diff --git a/scripts/run_integration_test.sh b/scripts/run_integration_test.sh
index 903f879..d70881b 100755
--- a/scripts/run_integration_test.sh
+++ b/scripts/run_integration_test.sh
@@ -8,10 +8,12 @@
 
 JAVA_TESTS_DIR="integration_test/src/java/com/twitter/heron/integration_test/topology"
 PYTHON_TESTS_DIR="integration_test/src/python/integration_test/topology"
+SCALA_TESTS_DIR="integration_test/src/scala/com/twitter/heron/integration_test/topology"
 
 # integration test binaries have to be specified as absolute path
 JAVA_INTEGRATION_TESTS_BIN="${PWD}/bazel-genfiles/integration_test/src/java/integration-tests.jar"
 PYTHON_INTEGRATION_TESTS_BIN="${PWD}/bazel-bin/integration_test/src/python/integration_test/topology/heron_integ_topology.pex"
+SCALA_INTEGRATION_TESTS_BIN="${PWD}/bazel-genfiles/integration_test/src/scala/scala-integration-tests.jar"
 
 CORE_PKG="file://${PWD}/bazel-bin/scripts/packages/heron-core.tar.gz"
 
@@ -28,6 +30,13 @@
 http_server_id=$!
 trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
 
+# run the scala integration tests
+${TEST_RUNNER} \
+  -hc ~/.heron/bin/heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \
+  -rh localhost -rp 8080 \
+  -tp ${SCALA_TESTS_DIR} \
+  -cl local -rl heron-staging -ev devel -pi ${CORE_PKG}
+
 # run the java integration tests
 ${TEST_RUNNER} \
   -hc ~/.heron/bin/heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \
diff --git a/scripts/travis/test.sh b/scripts/travis/test.sh
index 420a56f..2cece60 100755
--- a/scripts/travis/test.sh
+++ b/scripts/travis/test.sh
@@ -15,6 +15,7 @@
 # integration test binaries have to be specified as absolute path
 JAVA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/integration-tests.jar"
 PYTHON_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/heron_integ_topology.pex"
+SCALA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/scala-integration-tests.jar"
 
 # build test related jar
 T="heron build integration_test"
@@ -40,13 +41,27 @@
 python ./bazel-bin/integration_test/src/python/local_test_runner/local-test-runner
 end_timer "$T"
 
-# run the java integration test
-T="heron integration_test java"
+# initialize http-server for integration tests
+T="heron integration_test http-server initialization"
 start_timer "$T"
 ${HOME}/bin/http-server 8080 &
 http_server_id=$!
 trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
+end_timer "$T"
 
+# run the scala integration test
+T="heron integration_test scala"
+start_timer "$T"
+${HOME}/bin/test-runner \
+  -hc heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \
+  -rh localhost -rp 8080\
+  -tp ${HOME}/.herontests/data/scala \
+  -cl local -rl heron-staging -ev devel
+end_timer "$T"
+
+# run the java integration test
+T="heron integration_test java"
+start_timer "$T"
 ${HOME}/bin/test-runner \
   -hc heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \
   -rh localhost -rp 8080\