[GEARPUMP-358] Decrease the frequency of watermark calculation
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
- [ ] Make sure the commit message is formatted like:
`[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
- [ ] Make sure tests pass via `sbt clean test`.
- [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality.
Author: huafengw <fvunicorn@gmail.com>
Closes #232 from huafengw/regre.
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index fb2aaed..b43457e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -87,6 +87,7 @@
private var life = taskContextData.life
private var subscriptions = List.empty[(Int, Subscription)]
private[task] var sessionId = NONE_SESSION
+ private var minClockReported: Boolean = true
// Reports to appMaster with my address
express.registerLocalActor(TaskId.toLong(taskId), self)
@@ -181,7 +182,10 @@
case watermark@Watermark(instant) =>
assert(sender.eq(self), "Watermark should only be sent from Task to itself")
- onUpstreamMinClock(instant)
+ if (minClockReported) {
+ onUpstreamMinClock(instant)
+ minClockReported = false
+ }
receiveMessage(watermark.toMessage, sender)
case UpstreamMinClock(upstreamClock) =>
@@ -352,6 +356,7 @@
val update = UpdateClock(taskId, watermark.toEpochMilli)
context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) {
taskContextData.appMaster ! update
+ minClockReported = true
}
}