[Streamlet Scala API] Add Scala Streamlet Integration Tests Part I (#2826)
* Scala Streamlet Integration Tests Part I
* Minor change is applied to TextTransformer
* Fix Scala Streamlet Integration Test Packaging Problem
* Fix Scala Streamlet Integration Test Result Problem
* Add Fix for Travis
* Add Fix for Travis
diff --git a/.gitignore b/.gitignore
index 927928b..12fbf25 100644
--- a/.gitignore
+++ b/.gitignore
@@ -132,3 +132,6 @@
# Visual Studio Code
.vscode
+
+# integration_test
+results/
\ No newline at end of file
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java b/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java
index e00eb14..b982453 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java
@@ -11,10 +11,8 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-
package com.twitter.heron.streamlet.impl;
-
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -60,6 +58,10 @@
*/
public TopologyBuilder build() {
TopologyBuilder builder = new TopologyBuilder();
+ return build(builder);
+ }
+
+ public TopologyBuilder build(TopologyBuilder builder) {
Set<String> stageNames = new HashSet<>();
for (StreamletImpl<?> streamlet : sources) {
streamlet.build(builder, stageNames);
diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala b/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala
index 7867448..7bbbdd2 100644
--- a/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala
+++ b/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala
@@ -13,8 +13,6 @@
// limitations under the License.
package com.twitter.heron.streamlet.scala
-import java.io.Serializable
-
import com.twitter.heron.streamlet.Context
/**
diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala b/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala
index d6cd4c8..4458d70 100644
--- a/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala
+++ b/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala
@@ -37,4 +37,7 @@
def build(): TopologyBuilder =
builder.asInstanceOf[JavaBuilderImpl].build()
+ def build(topologyBuilder: TopologyBuilder): TopologyBuilder =
+ builder.asInstanceOf[JavaBuilderImpl].build(topologyBuilder)
+
}
diff --git a/integration_test/src/python/test_runner/main.py b/integration_test/src/python/test_runner/main.py
index 861a951..f5d4d1c 100644
--- a/integration_test/src/python/test_runner/main.py
+++ b/integration_test/src/python/test_runner/main.py
@@ -293,11 +293,15 @@
http_server_host_port = "%s:%d" % (args.http_server_hostname, args.http_server_port)
- if args.tests_bin_path.endswith(".jar"):
+ if args.tests_bin_path.endswith("scala-integration-tests.jar"):
+ test_topologies = filter_test_topologies(conf["scalaTopologies"], args.test_topology_pattern)
+ topology_classpath_prefix = conf["topologyClasspathPrefix"]
+ extra_topology_args = "-s http://%s/state" % http_server_host_port
+ elif args.tests_bin_path.endswith("integration-tests.jar"):
test_topologies = filter_test_topologies(conf["javaTopologies"], args.test_topology_pattern)
topology_classpath_prefix = conf["topologyClasspathPrefix"]
extra_topology_args = "-s http://%s/state" % http_server_host_port
- elif args.tests_bin_path.endswith(".pex"):
+ elif args.tests_bin_path.endswith("heron_integ_topology.pex"):
test_topologies = filter_test_topologies(conf["pythonTopologies"], args.test_topology_pattern)
topology_classpath_prefix = ""
extra_topology_args = ""
diff --git a/integration_test/src/python/test_runner/resources/test.json b/integration_test/src/python/test_runner/resources/test.json
index 049c58c..ba0addd 100644
--- a/integration_test/src/python/test_runner/resources/test.json
+++ b/integration_test/src/python/test_runner/resources/test.json
@@ -7,6 +7,13 @@
"cliConfigPath" : "$HOME/.heron/conf",
"topologyClasspathPrefix" : "com.twitter.heron.integration_test.topology.",
"releasePackageUri" : "scheme://role/name/version",
+ "scalaTopologies": [
+ {
+ "topologyName" : "IntegrationTest_ScalaStreamletWithFilterAndTransform",
+ "classPath" : "scala_streamlet_with_filter_and_transform.ScalaStreamletWithFilterAndTransform",
+ "expectedResultRelativePath" : "scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json"
+ }
+ ],
"javaTopologies": [
{
"topologyName" : "IntegrationTest_FieldsGrouping",
diff --git a/integration_test/src/scala/BUILD b/integration_test/src/scala/BUILD
new file mode 100644
index 0000000..48b7d81
--- /dev/null
+++ b/integration_test/src/scala/BUILD
@@ -0,0 +1,28 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+filegroup(
+ name = "test-data-files",
+ srcs = glob(["**/*.json"]),
+)
+
+scala_binary(
+ name = "scala-integration-tests-unshaded",
+ srcs = glob(["com/twitter/heron/integration_test/**/*.scala"]),
+ deps = [
+ "//heron/api/src/java:api-java",
+ "//heron/api/src/scala:api-scala",
+ "//integration_test/src/java:common",
+ "//integration_test/src/java:core",
+ "//heron/api/src/java:api-java-low-level"
+ ],
+ main_class = "com.twitter.heron.integration_test.topology.scala_streamlet_with_filter_and_transform.ScalaStreamletWithFilterAndTransform"
+)
+
+genrule(
+ name = 'scala-integration-tests',
+ srcs = [":scala-integration-tests-unshaded_deploy.jar"],
+ outs = ["scala-integration-tests.jar"],
+ cmd = "cp $< $@"
+)
\ No newline at end of file
diff --git a/integration_test/src/scala/com/twitter/heron/integration_test/common/ScalaIntegrationTestBase.scala b/integration_test/src/scala/com/twitter/heron/integration_test/common/ScalaIntegrationTestBase.scala
new file mode 100644
index 0000000..9563a1f
--- /dev/null
+++ b/integration_test/src/scala/com/twitter/heron/integration_test/common/ScalaIntegrationTestBase.scala
@@ -0,0 +1,32 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.integration_test.common
+
+import com.twitter.heron.integration_test.core.TestTopologyBuilder
+import com.twitter.heron.streamlet.scala.Builder
+import com.twitter.heron.streamlet.scala.impl.BuilderImpl
+
+/**
+ * Scala Integration Test Base
+ */
+trait ScalaIntegrationTestBase extends Serializable {
+
+ protected def build(testTopologyBuilder: TestTopologyBuilder,
+ streamletBuilder: Builder): TestTopologyBuilder = {
+ val streamletBuilderImpl = streamletBuilder.asInstanceOf[BuilderImpl]
+ val topologyBuilder = streamletBuilderImpl.build(testTopologyBuilder)
+ topologyBuilder.asInstanceOf[TestTopologyBuilder]
+ }
+
+}
diff --git a/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
new file mode 100644
index 0000000..3408032
--- /dev/null
+++ b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
@@ -0,0 +1,71 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.integration_test.topology.scala_streamlet_with_filter_and_transform
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import com.twitter.heron.api.Config
+import com.twitter.heron.integration_test.common.{
+ AbstractTestTopology,
+ ScalaIntegrationTestBase
+}
+import com.twitter.heron.integration_test.core.TestTopologyBuilder
+import com.twitter.heron.streamlet.Context
+import com.twitter.heron.streamlet.scala.{Builder, SerializableTransformer}
+
+object ScalaStreamletWithFilterAndTransform {
+ def main(args: Array[String]): Unit = {
+ val conf = new Config
+ val topology = new ScalaStreamletWithFilterAndTransform(args)
+ topology.submit(conf)
+ }
+}
+
+/**
+ * Scala Streamlet Integration Test
+ */
+@SerialVersionUID(-7280407024398984674L)
+class ScalaStreamletWithFilterAndTransform(args: Array[String])
+ extends AbstractTestTopology(args)
+ with ScalaIntegrationTestBase {
+
+ override protected def buildTopology(
+ testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = {
+ val atomicInteger = new AtomicInteger
+
+ val streamletBuilder = Builder.newBuilder
+
+ streamletBuilder
+ .newSource(() => atomicInteger.getAndIncrement())
+ .setName("incremented-numbers")
+ .filter((i: Int) => i <= 7)
+ .setName("positive-numbers-lower-than-8")
+ .transform[String](new TextTransformer())
+ .setName("numbers-transformed-to-text")
+
+ build(testTopologyBuilder, streamletBuilder)
+ }
+
+}
+
+private class TextTransformer extends SerializableTransformer[Int, String] {
+ private val alphabet = List("a", "b", "c", "d", "e", "f", "g", "h")
+
+ override def setup(context: Context): Unit = {}
+
+ override def transform(i: Int, fun: String => Unit): Unit =
+ fun(s"${alphabet(i)}-$i".toUpperCase)
+
+ override def cleanup(): Unit = {}
+}
diff --git a/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json
new file mode 100644
index 0000000..dda7256
--- /dev/null
+++ b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json
@@ -0,0 +1 @@
+["A-0", "B-1", "C-2", "D-3", "E-4", "F-5", "G-6", "H-7"]
\ No newline at end of file
diff --git a/scripts/applatix/javatests.sh b/scripts/applatix/javatests.sh
index 0744aaa..e19d5df 100755
--- a/scripts/applatix/javatests.sh
+++ b/scripts/applatix/javatests.sh
@@ -9,14 +9,29 @@
# integration test binaries have to be specified as absolute path
JAVA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/integration-tests.jar"
+SCALA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/scala-integration-tests.jar"
-# run the java integration test
-T="heron integration_test java"
+# initialize http-server for integration tests
+T="heron integration_test http-server initialization"
start_timer "$T"
${HOME}/bin/http-server 8080 &
http_server_id=$!
trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
+end_timer "$T"
+# run the scala integration test
+T="heron integration_test scala"
+start_timer "$T"
+${HOME}/bin/test-runner \
+ -hc heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \
+ -rh localhost -rp 8080\
+ -tp ${HOME}/.herontests/data/scala \
+ -cl local -rl heron-staging -ev devel
+end_timer "$T"
+
+# run the java integration test
+T="heron integration_test java"
+start_timer "$T"
${HOME}/bin/test-runner \
-hc heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \
-rh localhost -rp 8080\
diff --git a/scripts/applatix/test.sh b/scripts/applatix/test.sh
index c346dea..8f7391d 100755
--- a/scripts/applatix/test.sh
+++ b/scripts/applatix/test.sh
@@ -18,6 +18,7 @@
# integration test binaries have to be specified as absolute path
JAVA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/integration-tests.jar"
PYTHON_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/heron_integ_topology.pex"
+SCALA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/scala-integration-tests.jar"
# install client
T="heron client install"
@@ -37,13 +38,27 @@
python ${UTILS}/save-logs.py "heron_tests_install.txt" ./heron-tests-install.sh --user
end_timer "$T"
-# run the java integration test
-T="heron integration_test java"
+# initialize http-server for integration tests
+T="heron integration_test http-server initialization"
start_timer "$T"
${HOME}/bin/http-server 8080 &
http_server_id=$!
trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
+end_timer "$T"
+# run the scala integration test
+T="heron integration_test scala"
+start_timer "$T"
+${HOME}/bin/test-runner \
+ -hc heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \
+ -rh localhost -rp 8080\
+ -tp ${HOME}/.herontests/data/scala \
+ -cl local -rl heron-staging -ev devel
+end_timer "$T"
+
+# run the java integration test
+T="heron integration_test java"
+start_timer "$T"
${HOME}/bin/test-runner \
-hc heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \
-rh localhost -rp 8080\
diff --git a/scripts/packages/BUILD b/scripts/packages/BUILD
index facfb2b..eada6e2 100644
--- a/scripts/packages/BUILD
+++ b/scripts/packages/BUILD
@@ -344,6 +344,15 @@
)
pkg_tar(
+ name = "heron-tests-data-scala",
+ package_dir = "data/scala",
+ srcs = [
+ "//integration_test/src/scala:test-data-files",
+ ],
+ strip_prefix = '/integration_test/src/scala/com/twitter/heron/integration_test/topology/'
+)
+
+pkg_tar(
name = "heron-tests-data-java",
package_dir = "data/java",
srcs = [
@@ -365,6 +374,7 @@
name = "heron-tests-lib",
package_dir = "lib",
srcs = [
+ "//integration_test/src/scala:scala-integration-tests",
"//integration_test/src/java:integration-tests",
"//integration_test/src/python/integration_test/topology:heron_integ_topology",
],
@@ -376,6 +386,7 @@
srcs = generated_release_files,
deps = [
":heron-tests-bin",
+ ":heron-tests-data-scala",
":heron-tests-data-java",
":heron-tests-data-python",
":heron-tests-lib",
diff --git a/scripts/run_integration_test.sh b/scripts/run_integration_test.sh
index 903f879..d70881b 100755
--- a/scripts/run_integration_test.sh
+++ b/scripts/run_integration_test.sh
@@ -8,10 +8,12 @@
JAVA_TESTS_DIR="integration_test/src/java/com/twitter/heron/integration_test/topology"
PYTHON_TESTS_DIR="integration_test/src/python/integration_test/topology"
+SCALA_TESTS_DIR="integration_test/src/scala/com/twitter/heron/integration_test/topology"
# integration test binaries have to be specified as absolute path
JAVA_INTEGRATION_TESTS_BIN="${PWD}/bazel-genfiles/integration_test/src/java/integration-tests.jar"
PYTHON_INTEGRATION_TESTS_BIN="${PWD}/bazel-bin/integration_test/src/python/integration_test/topology/heron_integ_topology.pex"
+SCALA_INTEGRATION_TESTS_BIN="${PWD}/bazel-genfiles/integration_test/src/scala/scala-integration-tests.jar"
CORE_PKG="file://${PWD}/bazel-bin/scripts/packages/heron-core.tar.gz"
@@ -28,6 +30,13 @@
http_server_id=$!
trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
+# run the scala integration tests
+${TEST_RUNNER} \
+ -hc ~/.heron/bin/heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \
+ -rh localhost -rp 8080 \
+ -tp ${SCALA_TESTS_DIR} \
+ -cl local -rl heron-staging -ev devel -pi ${CORE_PKG}
+
# run the java integration tests
${TEST_RUNNER} \
-hc ~/.heron/bin/heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \
diff --git a/scripts/travis/test.sh b/scripts/travis/test.sh
index 420a56f..2cece60 100755
--- a/scripts/travis/test.sh
+++ b/scripts/travis/test.sh
@@ -15,6 +15,7 @@
# integration test binaries have to be specified as absolute path
JAVA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/integration-tests.jar"
PYTHON_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/heron_integ_topology.pex"
+SCALA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/scala-integration-tests.jar"
# build test related jar
T="heron build integration_test"
@@ -40,13 +41,27 @@
python ./bazel-bin/integration_test/src/python/local_test_runner/local-test-runner
end_timer "$T"
-# run the java integration test
-T="heron integration_test java"
+# initialize http-server for integration tests
+T="heron integration_test http-server initialization"
start_timer "$T"
${HOME}/bin/http-server 8080 &
http_server_id=$!
trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
+end_timer "$T"
+# run the scala integration test
+T="heron integration_test scala"
+start_timer "$T"
+${HOME}/bin/test-runner \
+ -hc heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \
+ -rh localhost -rp 8080\
+ -tp ${HOME}/.herontests/data/scala \
+ -cl local -rl heron-staging -ev devel
+end_timer "$T"
+
+# run the java integration test
+T="heron integration_test java"
+start_timer "$T"
${HOME}/bin/test-runner \
-hc heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \
-rh localhost -rp 8080\