blob: 97d5daf47a51adf9261d83e7c0935b8f67e42665 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.integrationtest.checklist
import org.apache.log4j.Logger
import org.apache.gearpump.integrationtest.{Docker, TestSpecBase, Util}
import org.apache.gearpump.streaming._
import org.apache.gearpump.streaming.appmaster.ProcessorSummary
/**
* The test spec will perform destructive operations to check the stability
*/
class ExampleSpec extends TestSpecBase {
private val LOG = Logger.getLogger(getClass)
"distributed shell" should {
"execute commands on machines where its executors are running" in {
val distShellJar = cluster.queryBuiltInExampleJars("distributedshell").head
val mainClass = "org.apache.gearpump.examples.distributedshell.DistributedShell"
val clientClass = "org.apache.gearpump.examples.distributedshell.DistributedShellClient"
val appId = restClient.getNextAvailableAppId()
val success = restClient.submitApp(distShellJar, cluster.getWorkerHosts.length, mainClass)
success shouldBe true
expectAppIsRunning(appId, "DistributedShell")
val args = Array(
clientClass,
"-appid", appId.toString,
"-command", "hostname"
)
val expectedHostNames = cluster.getWorkerHosts.map(Docker.getHostName)
def verify(): Boolean = {
val workerNum = cluster.getWorkerHosts.length
val result = commandLineClient.submitAppAndCaptureOutput(distShellJar,
workerNum, args.mkString(" ")).split("\n").
filterNot(line => line.startsWith("[INFO]") || line.isEmpty)
expectedHostNames.forall(result.contains)
}
Util.retryUntil(() => verify(),
s"executors started on all expected hosts ${expectedHostNames.mkString(", ")}")
}
}
"wordcount" should {
val wordCountJarNamePrefix = "wordcount_"
behave like streamingApplication(wordCountJarNamePrefix, wordCountName)
"can submit immediately after killing a former one" in {
// setup
val formerAppId = restClient.getNextAvailableAppId()
val formerSubmissionSuccess =
restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
formerSubmissionSuccess shouldBe true
expectAppIsRunning(formerAppId, wordCountName)
Util.retryUntil(() =>
restClient.queryStreamingAppDetail(formerAppId).clock > 0, "app running")
restClient.killApp(formerAppId)
// exercise
val appId = formerAppId + 1
val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
success shouldBe true
expectAppIsRunning(appId, wordCountName)
}
}
"wordcount(java)" should {
val wordCountJavaJarNamePrefix = "wordcountjava"
val wordCountJavaName = "wordcountJava"
behave like streamingApplication(wordCountJavaJarNamePrefix, wordCountJavaName)
}
"sol" should {
val solJarNamePrefix = "sol"
val solName = "sol"
behave like streamingApplication(solJarNamePrefix, solName)
}
"complexdag" should {
val dynamicDagJarNamePrefix = "complexdag"
val dynamicDagName = "dag"
behave like streamingApplication(dynamicDagJarNamePrefix, dynamicDagName)
}
def streamingApplication(jarNamePrefix: String, appName: String): Unit = {
lazy val jar = cluster.queryBuiltInExampleJars(jarNamePrefix).head
"can obtain application clock and the clock will keep changing" in {
// setup
val appId = restClient.getNextAvailableAppId()
val success = restClient.submitApp(jar, cluster.getWorkerHosts.length)
success shouldBe true
expectAppIsRunning(appId, appName)
// exercise
Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app submitted")
val formerClock = restClient.queryStreamingAppDetail(appId).clock
Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > formerClock,
"app clock is advancing")
}
"can change the parallelism and description of a processor" in {
// setup
val appId = restClient.getNextAvailableAppId()
val formerSubmissionSuccess = restClient.submitApp(jar, cluster.getWorkerHosts.length)
formerSubmissionSuccess shouldBe true
expectAppIsRunning(appId, appName)
val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
val processor0 = formerProcessors.get(0).get
val expectedProcessorId = formerProcessors.size
val expectedParallelism = processor0.parallelism + 1
val expectedDescription = processor0.description + "new"
val replaceMe = new ProcessorDescription(processor0.id, processor0.taskClass,
expectedParallelism, description = expectedDescription)
// exercise
val success = restClient.replaceStreamingAppProcessor(appId, replaceMe)
success shouldBe true
var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
Util.retryUntil(() => {
laterProcessors = restClient.queryStreamingAppDetail(appId).processors
laterProcessors.size == formerProcessors.size + 1
}, "new process added")
val laterProcessor0 = laterProcessors.get(expectedProcessorId).get
laterProcessor0.parallelism shouldEqual expectedParallelism
laterProcessor0.description shouldEqual expectedDescription
}
}
}