[GEARPUMP-358] Decrease the frequency of watermark calculation

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the commit message is formatted like:
   `[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
 - [ ] Make sure tests pass via `sbt clean test`.
 - [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality.

Author: huafengw <fvunicorn@gmail.com>

Closes #232 from huafengw/regre.
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index fb2aaed..b43457e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -87,6 +87,7 @@
   private var life = taskContextData.life
   private var subscriptions = List.empty[(Int, Subscription)]
   private[task] var sessionId = NONE_SESSION
+  private var minClockReported: Boolean = true
 
   // Reports to appMaster with my address
   express.registerLocalActor(TaskId.toLong(taskId), self)
@@ -181,7 +182,10 @@
 
     case watermark@Watermark(instant) =>
       assert(sender.eq(self), "Watermark should only be sent from Task to itself")
-      onUpstreamMinClock(instant)
+      if (minClockReported) {
+        onUpstreamMinClock(instant)
+        minClockReported = false
+      }
       receiveMessage(watermark.toMessage, sender)
 
     case UpstreamMinClock(upstreamClock) =>
@@ -352,6 +356,7 @@
     val update = UpdateClock(taskId, watermark.toEpochMilli)
     context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) {
       taskContextData.appMaster ! update
+      minClockReported = true
     }
   }