[GEARPUMP-320] Handle stashed messages after onStart
Author: manuzhang <owenzhang1990@gmail.com>
Closes #192 from manuzhang/fix_start.
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index 1b90146..1fb61bd 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -275,16 +275,16 @@
subscriptions.foreach(_._2.start())
- stashQueue.asScala.foreach { item =>
- handleMessages(item.sender).apply(item.msg)
- }
- stashQueue.clear()
-
// Put this as the last step so that the subscription is already initialized.
// Message sending in current Task before onStart will not be delivered to
// target
onStart(Instant.ofEpochMilli(startClock))
+ stashQueue.asScala.foreach { item =>
+ handleMessages(item.sender).apply(item.msg)
+ }
+ stashQueue.clear()
+
taskContextData.appMaster ! GetUpstreamMinClock(taskId)
context.become(handleMessages(sender))
}