blob: c4276bd47363c8a60e5a53e9f2c1a484ecf64a16 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.integrationtest.checklist
import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
import org.apache.gearpump.metrics.Metrics.Meter
import org.apache.gearpump.streaming._
import org.apache.gearpump.streaming.appmaster.ProcessorSummary
class DynamicDagSpec extends TestSpecBase {
val sourceTaskClass = "org.apache.gearpump.streaming.examples.sol.SOLStreamProducer"
val sinkTaskClass = "org.apache.gearpump.streaming.examples.sol.SOLStreamProcessor"
lazy val solJar = cluster.queryBuiltInExampleJars("sol").head
"dynamic dag" should {
"can retrieve a list of built-in partitioner classes" in {
val partitioners = restClient.queryBuiltInPartitioners()
partitioners.length should be > 0
partitioners.foreach(clazz =>
clazz should startWith("org.apache.gearpump.streaming.partitioner.")
)
}
"can compose a wordcount application from scratch" in {
// todo: blocked by #1450
}
"can replace downstream with SOLStreamProcessor (new processor will have metrics)" in {
// setup
val appId = expectWordCountJarSubmittedWithAppId()
// exercise
val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
replaceProcessor(appId, 1, sinkTaskClass)
var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
Util.retryUntil(() => {
laterProcessors = restClient.queryStreamingAppDetail(appId).processors
laterProcessors.size == formerProcessors.size + 1
}, "new processor successfully added")
processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
}
"can replace upstream with SOLStreamProducer (new processor will have metrics)" in {
// setup
val appId = expectWordCountJarSubmittedWithAppId()
// exercise
val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
replaceProcessor(appId, 0, sourceTaskClass)
var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
Util.retryUntil(() => {
laterProcessors = restClient.queryStreamingAppDetail(appId).processors
laterProcessors.size == formerProcessors.size + 1
}, "new processor added")
processorHasThroughput(appId, laterProcessors.keySet.max, "sendThroughput")
}
"fall back to last dag version when replacing a processor failid" in {
// setup
val appId = expectWordCountJarSubmittedWithAppId()
// exercise
val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
replaceProcessor(appId, 1, sinkTaskClass)
var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
Util.retryUntil(() => {
laterProcessors = restClient.queryStreamingAppDetail(appId).processors
laterProcessors.size == formerProcessors.size + 1
}, "new processor added")
processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
val fakeTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Fake"
replaceProcessor(appId, laterProcessors.keySet.max, fakeTaskClass)
Util.retryUntil(() => {
val processorsAfterFailure = restClient.queryStreamingAppDetail(appId).processors
processorsAfterFailure.size == laterProcessors.size
}, "new processor added")
val currentClock = restClient.queryStreamingAppDetail(appId).clock
Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > currentClock,
"app clock is advancing")
}
"fall back to last dag version when AppMaster HA triggered" in {
// setup
val appId = expectWordCountJarSubmittedWithAppId()
// exercise
val formerAppMaster = restClient.queryApp(appId).appMasterPath
val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
replaceProcessor(appId, 1, sinkTaskClass)
var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
Util.retryUntil(() => {
laterProcessors = restClient.queryStreamingAppDetail(appId).processors
laterProcessors.size == formerProcessors.size + 1
}, "new processor added")
processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
restClient.killAppMaster(appId) shouldBe true
Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != formerAppMaster,
"new AppMaster created")
val processors = restClient.queryStreamingAppDetail(appId).processors
processors.size shouldEqual laterProcessors.size
}
}
private def expectWordCountJarSubmittedWithAppId(): Int = {
val appId = restClient.getNextAvailableAppId()
val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
success shouldBe true
expectAppIsRunning(appId, wordCountName)
Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
appId
}
private def replaceProcessor(
appId: Int,
formerProcessorId: Int,
newTaskClass: String,
newProcessorDescription: String = "",
newParallelism: Int = 1): Unit = {
val uploadedJar = restClient.uploadJar(solJar)
val replaceMe = new ProcessorDescription(formerProcessorId, newTaskClass,
newParallelism, newProcessorDescription,
jar = uploadedJar)
// exercise
val success = restClient.replaceStreamingAppProcessor(appId, replaceMe)
success shouldBe true
}
private def processorHasThroughput(appId: Int, processorId: Int, metrics: String): Unit = {
Util.retryUntil(() => {
val actual = restClient.queryStreamingAppMetrics(appId, current = false,
path = "processor" + processorId)
val throughput = actual.metrics.filter(_.value.name.endsWith(metrics))
throughput.size should be > 0
throughput.forall(_.value.asInstanceOf[Meter].count > 0L)
}, "new processor has message received")
}
}