package org.apache.gearpump.streaming.appmaster
import{ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
import org.apache.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess, HealthChecker, ProcessorClock}
import org.apache.gearpump.streaming.appmaster.ClockServiceSpec.Store
import org.apache.gearpump.streaming.task.{GetLatestMinClock, GetStartClock, UpstreamMinClock, _}
import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription}
import org.apache.gearpump.util.Graph
import org.apache.gearpump.util.Graph._
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import scala.concurrent.{Future, Promise}
class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
with WordSpecLike with Matchers with BeforeAndAfterAll {
def this() = this(ActorSystem("ClockServiceSpec", TestUtil.DEFAULT_CONFIG))
val hash = Partitioner[HashPartitioner]
val task1 = ProcessorDescription(id = 0, taskClass = classOf[TaskActor].getName, parallelism = 1)
val task2 = ProcessorDescription(id = 1, taskClass = classOf[TaskActor].getName, parallelism = 1)
val dag = DAG(Graph(task1 ~ hash ~> task2))
override def afterAll(): Unit = {
"The ClockService" should {
"maintain a global view of message timestamp in the application" in {
val store = new Store()
val startClock = 100L
store.put(ClockService.START_CLOCK, startClock)
val clockService = system.actorOf(Props(new ClockService(dag, store)))
clockService ! GetLatestMinClock
// task(0,0): clock(101); task(1,0): clock(100)
clockService ! UpdateClock(TaskId(0, 0), 101)
// Min clock is updated
clockService ! GetLatestMinClock
// task(0,0): clock(101); task(1,0): clock(101)
clockService ! UpdateClock(TaskId(1, 0), 101)
// Upstream is Task(0, 0), 101
// Min clock is updated
clockService ! GetLatestMinClock
"act on ChangeToNewDAG and make sure downstream clock smaller than upstreams" in {
val store = new Store()
val startClock = 100L
store.put(ClockService.START_CLOCK, startClock)
val clockService = system.actorOf(Props(new ClockService(dag, store)))
val task = TestProbe()
clockService.tell(UpdateClock(TaskId(0, 0), 200), task.ref)
val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName,
parallelism = 1)
val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName,
parallelism = 1)
val task5 = ProcessorDescription(id = 5, taskClass = classOf[TaskActor].getName,
parallelism = 1)
val dagAddMiddleNode = DAG(Graph(
task1 ~ hash ~> task2,
task1 ~ hash ~> task3,
task3 ~ hash ~> task2,
task2 ~ hash ~> task4,
task5 ~ hash ~> task1
), version = 1)
val user = TestProbe()
clockService.tell(ChangeToNewDAG(dagAddMiddleNode), user.ref)
val clocks = user.expectMsgPF() {
case ChangeToNewDAGSuccess(newDagClocks) =>
// For intermediate task, pick its upstream as initial clock
assert(clocks( == clocks(
// For sink task, pick its upstream as initial clock
assert(clocks( == clocks(
// For source task, set the initial clock as startClock
assert(clocks( == startClock)
"maintain global checkpoint time" in {
val store = new Store()
val startClock = 100L
store.put(ClockService.START_CLOCK, startClock)
val clockService = system.actorOf(Props(new ClockService(dag, store)))
clockService ! UpdateClock(TaskId(0, 0), 200L)
clockService ! UpdateClock(TaskId(1, 0), 200L)
clockService ! GetStartClock
val conf = UserConfig.empty.withBoolean("state.checkpoint.enable", value = true)
val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName,
parallelism = 1, taskConf = conf)
val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName,
parallelism = 1, taskConf = conf)
val dagWithStateTasks = DAG(Graph(
task1 ~ hash ~> task2,
task1 ~ hash ~> task3,
task3 ~ hash ~> task2,
task2 ~ hash ~> task4
), version = 1)
val taskId3 = TaskId(3, 0)
val taskId4 = TaskId(4, 0)
clockService ! ChangeToNewDAG(dagWithStateTasks)
clockService ! UpdateCheckpointClock(taskId3, startClock)
clockService ! UpdateCheckpointClock(taskId4, startClock)
clockService ! GetStartClock
clockService ! UpdateCheckpointClock(taskId3, 200L)
clockService ! UpdateCheckpointClock(taskId4, 300L)
clockService ! GetStartClock
clockService ! UpdateCheckpointClock(taskId3, 300L)
clockService ! GetStartClock
"ProcessorClock" should {
"maintain the min clock of current processor" in {
val processorId = 0
val parallism = 3
val clock = new ProcessorClock(processorId, LifeTime.Immortal, parallism)
clock.updateMinClock(0, 101)
assert(clock.min == 100L)
clock.updateMinClock(1, 102)
assert(clock.min == 100L)
clock.updateMinClock(2, 103)
assert(clock.min == 101L)
"HealthChecker" should {
"report stalling if the clock is not advancing" in {
val healthChecker = new HealthChecker(stallingThresholdSeconds = 1)
val source = ProcessorDescription(id = 0, taskClass = null, parallelism = 1)
val sourceClock = new ProcessorClock(0, LifeTime.Immortal, 1)
val sink = ProcessorDescription(id = 1, taskClass = null, parallelism = 1)
val sinkClock = new ProcessorClock(1, LifeTime.Immortal, 1)
val graph = Graph.empty[ProcessorDescription, PartitionerDescription]
graph.addEdge(source, PartitionerDescription(null), sink)
val dag = DAG(graph)
val clocks = Map(
0 -> sourceClock,
1 -> sinkClock
sourceClock.updateMinClock(0, 100L)
sinkClock.updateMinClock(0, 100L)
// Clock advances from 0 to 100, there is no stalling.
healthChecker.check(currentMinClock = 100, clocks, dag, 200)
healthChecker.getReport.stallingTasks shouldBe List.empty[TaskId]
// Clock not advancing.
// Pasted time exceed the stalling threshold, report stalling
healthChecker.check(currentMinClock = 100, clocks, dag, 1300)
// The source task is stalling the clock
healthChecker.getReport.stallingTasks shouldBe List(TaskId(0, 0))
// Advance the source clock
sourceClock.updateMinClock(0, 101L)
healthChecker.check(currentMinClock = 100, clocks, dag, 1300)
// The sink task is stalling the clock
healthChecker.getReport.stallingTasks shouldBe List(TaskId(1, 0))
object ClockServiceSpec {
class Store extends AppDataStore {
private var map = Map.empty[String, Any]
def put(key: String, value: Any): Future[Any] = {
map = map + (key -> value)
def get(key: String): Future[Any] = {
Promise.successful(map.getOrElse(key, null)).future