blob: 7c04cc0a1feb87bd4811691dd23aa624cb7d052d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.integrationtest.checklist
import org.apache.gearpump.integrationtest.kafka.{KafkaCluster, MessageLossDetector, SimpleKafkaReader}
import org.apache.gearpump.integrationtest.storm.StormClient
import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
/**
* The test spec checks the compatibility of running Storm applications
*/
class StormCompatibilitySpec extends TestSpecBase {
private lazy val stormClient = {
new StormClient(cluster, restClient)
}
val `version0.9` = "09"
val `version0.10` = "010"
override def beforeAll(): Unit = {
super.beforeAll()
stormClient.start()
}
override def afterAll(): Unit = {
stormClient.shutDown()
super.afterAll()
}
def withStorm(testCode: String => Unit): Unit = {
testCode(`version0.9`)
testCode(`version0.10`)
}
def getTopologyName(name: String, stormVersion: String): String = {
s"${name}_$stormVersion"
}
def getStormJar(stormVersion: String): String = {
cluster.queryBuiltInITJars(s"storm$stormVersion").head
}
"Storm over Gearpump" should withStorm {
stormVersion =>
s"support basic topologies ($stormVersion)" in {
val stormJar = getStormJar(stormVersion)
val topologyName = getTopologyName("exclamation", stormVersion)
// exercise
val appId = stormClient.submitStormApp(
jar = stormJar,
mainClass = "storm.starter.ExclamationTopology",
args = topologyName,
appName = topologyName)
// verify
Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
}
s"support to run a python version of wordcount ($stormVersion)" in {
val stormJar = getStormJar(stormVersion)
val topologyName = getTopologyName("wordcount", stormVersion)
// exercise
val appId = stormClient.submitStormApp(
jar = stormJar,
mainClass = "storm.starter.WordCountTopology",
args = topologyName,
appName = topologyName)
// verify
Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
}
s"support DRPC ($stormVersion)" in {
// ReachTopology computes the Twitter url reached by users and their followers
// using Storm Distributed RPC feature
// input (user and follower) data are already prepared in memory
val stormJar = getStormJar(stormVersion)
val topologyName = getTopologyName("reach", stormVersion)
stormClient.submitStormApp(
jar = stormJar,
mainClass = "storm.starter.ReachTopology",
args = topologyName,
appName = topologyName)
val drpcClient = stormClient.getDRPCClient(cluster.getNetworkGateway)
// verify
Util.retryUntil(() => {
drpcClient.execute("reach", "notaurl.com") == "0"
}, "drpc reach == 0")
drpcClient.execute("reach", "foo.com/blog/1") shouldEqual "16"
drpcClient.execute("reach", "engineering.twitter.com/blog/5") shouldEqual "14"
}
s"support tick tuple ($stormVersion)" in {
val stormJar = getStormJar(stormVersion)
val topologyName = getTopologyName("slidingWindowCounts", stormVersion)
// exercise
val appId = stormClient.submitStormApp(
jar = stormJar,
mainClass = "storm.starter.RollingTopWords",
args = s"$topologyName remote",
appName = topologyName)
// verify
Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
}
s"support at-least-once semantics with Storm's Kafka connector ($stormVersion)" in {
val stormJar = getStormJar(stormVersion)
val topologyName = getTopologyName("storm_kafka", stormVersion)
val stormKafkaTopology =
s"org.apache.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology"
import org.apache.gearpump.integrationtest.kafka.KafkaCluster._
withKafkaCluster(cluster) {
kafkaCluster =>
val sourcePartitionNum = 2
val sinkPartitionNum = 1
val zookeeper = kafkaCluster.getZookeeperConnectString
val brokerList = kafkaCluster.getBrokerListConnectString
val sourceTopic = "topic1"
val sinkTopic = "topic2"
val args = Array("-topologyName", topologyName, "-sourceTopic", sourceTopic,
"-sinkTopic", sinkTopic, "-zookeeperConnect", zookeeper, "-brokerList", brokerList,
"-spoutNum", s"$sourcePartitionNum", "-boltNum", s"$sinkPartitionNum"
)
kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
// generate number sequence (1, 2, 3, ...) to the topic
withDataProducer(sourceTopic, brokerList) { producer =>
val appId = stormClient.submitStormApp(
jar = stormJar,
mainClass = stormKafkaTopology,
args = args.mkString(" "),
appName = topologyName)
Util.retryUntil(() =>
restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
// kill executor and verify at-least-once is guaranteed on application restart
val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
restClient.killExecutor(appId, executorToKill) shouldBe true
Util.retryUntil(() =>
restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill,
s"executor $executorToKill killed")
// verify no message loss
val detector = new MessageLossDetector(producer.lastWriteNum)
val kafkaReader =
new SimpleKafkaReader(detector, sinkTopic, host = kafkaCluster.advertisedHost,
port = kafkaCluster.advertisedPort)
Util.retryUntil(() => {
kafkaReader.read()
detector.allReceived
}, "all kafka message read")
}
}
}
}
}