[GEARPUMP-274] Set lower bound of time to Long.MinValue milliseconds

Author: manuzhang <owenzhang1990@gmail.com>

Closes #151 from manuzhang/time.
diff --git a/core/src/main/scala/org/apache/gearpump/package.scala b/core/src/main/scala/org/apache/gearpump/package.scala
index b1118d3..2f74ac4 100644
--- a/core/src/main/scala/org/apache/gearpump/package.scala
+++ b/core/src/main/scala/org/apache/gearpump/package.scala
@@ -20,5 +20,10 @@
 
 package object gearpump {
   type TimeStamp = Long
-  val LatestTime = -1
+
+  // maximum time won't overflow when converted to milli-seconds
+  val MAX_TIME_MILLIS: Long = Long.MaxValue
+
+  // minimum time won't overflow when converted to milli-seconds
+  val MIN_TIME_MILLIS: Long = Long.MinValue
 }
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
index 1b239b5..fd023a9 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
@@ -28,7 +28,7 @@
 import backtype.storm.tuple.Fields
 import backtype.storm.utils.Utils
 import org.slf4j.Logger
-import org.apache.gearpump._
+import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp}
 import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
 import org.apache.gearpump.experiments.storm.util.StormUtil._
 import org.apache.gearpump.streaming.ProcessorId
@@ -56,7 +56,7 @@
         streamGroupers, componentToProcessorId, values)
     }
     new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn,
-      taskContext, LatestTime)
+      taskContext, MIN_TIME_MILLIS)
   }
 
   /**
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
index e0e9e61..6b894da 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
@@ -28,17 +28,17 @@
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import org.apache.gearpump._
+import org.apache.gearpump.{Message, MIN_TIME_MILLIS, TimeStamp}
 import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
 import org.apache.gearpump.streaming.MockUtil
 
 class StormOutputCollectorSpec
   extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
-  val stormTaskId = 0
-  val streamIdGen = Gen.alphaStr
-  val valuesGen = Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]])
-  val timestampGen = Gen.chooseNum[Long](0L, 1000L)
+  private val stormTaskId = 0
+  private val streamIdGen = Gen.alphaStr
+  private val valuesGen = Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]])
+  private val timestampGen = Gen.chooseNum[Long](0L, 1000L)
 
   property("StormOutputCollector emits tuple values into a stream") {
     forAll(timestampGen, streamIdGen, valuesGen) {
@@ -53,7 +53,7 @@
           targetStormTaskIds))
         val taskContext = MockUtil.mockTaskContext
         val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
-          targets, getTargetPartitionsFn, taskContext, LatestTime)
+          targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS)
 
         when(targets.containsKey(streamId)).thenReturn(false)
         stormOutputCollector.emit(streamId, values) shouldBe StormOutputCollector.EMPTY_LIST
@@ -86,7 +86,7 @@
           targetStormTaskIds))
         val taskContext = MockUtil.mockTaskContext
         val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
-          targets, getTargetPartitionsFn, taskContext, LatestTime)
+          targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS)
 
         when(targets.containsKey(streamId)).thenReturn(false)
         verify(taskContext, times(0)).output(anyObject[Message])
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index d4b3719..23350dd 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -20,10 +20,8 @@
 
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
-
 import akka.actor.ActorSystem
-
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, TimeStamp}
 import org.apache.gearpump.cluster._
 import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject}
 import org.apache.gearpump.streaming.appmaster.AppMaster
@@ -115,7 +113,7 @@
 }
 
 object LifeTime {
-  val Immortal = LifeTime(0L, Long.MaxValue)
+  val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS)
 }
 
 /**
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
index 0a2999d..2a24b66 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
@@ -25,7 +25,7 @@
 
 import akka.actor.{Actor, ActorRef, Cancellable, Stash}
 import com.google.common.primitives.Longs
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.{MIN_TIME_MILLIS, TimeStamp}
 import org.apache.gearpump.cluster.ClientToMaster.GetStallingTasks
 import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks
 import org.apache.gearpump.streaming._
@@ -53,8 +53,8 @@
   import context.dispatcher
 
   private val healthChecker = new HealthChecker(stallingThresholdSeconds = 60)
-  private var healthCheckScheduler: Cancellable = null
-  private var snapshotScheduler: Cancellable = null
+  private var healthCheckScheduler: Cancellable = _
+  private var snapshotScheduler: Cancellable = _
 
   override def receive: Receive = null
 
@@ -88,7 +88,7 @@
   // We use Array instead of List for Performance consideration
   private var processorClocks = Array.empty[ProcessorClock]
 
-  private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = null
+  private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = _
 
   private var minCheckpointClock: Option[TimeStamp] = None
 
@@ -337,7 +337,8 @@
   case object HealthCheck
 
   class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallelism: Int,
-      private var _min: TimeStamp = 0L, private var _taskClocks: Array[TimeStamp] = null) {
+      private var _min: TimeStamp = MIN_TIME_MILLIS,
+      private var _taskClocks: Array[TimeStamp] = null) {
 
     def copy(life: LifeTime): ProcessorClock = {
       new ProcessorClock(processorId, life, parallelism, _min, _taskClocks)
@@ -370,7 +371,7 @@
   class HealthChecker(stallingThresholdSeconds: Int) {
     private val LOG: Logger = LogUtil.getLogger(getClass)
 
-    private var minClock: ClockValue = null
+    private var minClock: ClockValue = _
     private val stallingThresholdMilliseconds = stallingThresholdSeconds * 1000
     // 60 seconds
     private var stallingTasks = Array.empty[TaskId]
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
index 1f8d3a1..9c27bde 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -20,6 +20,7 @@
 import java.time.Instant
 
 import org.apache.gearpump.Message
+import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS}
 
 /**
  * message used by source task to report source watermark.
@@ -31,5 +32,8 @@
 object Watermark {
 
   // maximum time won't overflow when converted to milli-seconds
-  val MAX = Instant.ofEpochMilli(Long.MaxValue)
+  val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS)
+
+  // minimum time won't overflow when converted to milli-seconds
+  val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS)
 }
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index 3e68580..c6817f5 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -26,7 +26,7 @@
 import org.apache.gearpump.streaming.LifeTime
 import org.apache.gearpump.streaming.task.Subscription._
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.{MAX_TIME_MILLIS, Message, MIN_TIME_MILLIS, TimeStamp}
 
 /**
  * Manges the output and message clock for single downstream processor
@@ -58,8 +58,8 @@
   private val pendingMessageCount: Array[Short] = new Array[Short](parallelism)
   private val candidateMinClockSince: Array[Short] = new Array[Short](parallelism)
 
-  private val minClockValue: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue)
-  private val candidateMinClock: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue)
+  private val minClockValue: Array[TimeStamp] = Array.fill(parallelism)(MAX_TIME_MILLIS)
+  private val candidateMinClock: Array[TimeStamp] = Array.fill(parallelism)(MAX_TIME_MILLIS)
 
   private var maxPendingCount: Short = 0
 
@@ -133,7 +133,7 @@
     }
   }
 
-  private var lastFlushTime: Long = 0L
+  private var lastFlushTime: Long = MIN_TIME_MILLIS
   private val FLUSH_INTERVAL = 5 * 1000 // ms
   private def needFlush: Boolean = {
     System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL &&
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index 14c2b59..318ebf8 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -33,7 +33,7 @@
 import org.apache.gearpump.streaming.ExecutorToAppMaster._
 import org.apache.gearpump.streaming.ProcessorId
 import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.{MAX_TIME_MILLIS, Message, MIN_TIME_MILLIS, TimeStamp}
 
 /**
  *
@@ -46,8 +46,8 @@
     val task: TaskWrapper,
     inputSerializerPool: SerializationFramework)
     extends Actor with ExpressTransport with TimeOutScheduler {
-  var upstreamMinClock: TimeStamp = 0L
-  private var _minClock: TimeStamp = 0L
+  private var upstreamMinClock: TimeStamp = MIN_TIME_MILLIS
+  private var _minClock: TimeStamp = MIN_TIME_MILLIS
   private var minClockReported: Boolean = true
 
   def serializerPool: SerializationFramework = inputSerializerPool
@@ -246,7 +246,7 @@
 
       receiveMessage(watermark.toMessage, sender)
 
-    case upstream@UpstreamMinClock(upstreamClock) =>
+    case UpstreamMinClock(upstreamClock) =>
       updateUpstreamMinClock(upstreamClock)
 
     case ChangeTask(_, dagVersion, life, subscribers) =>
@@ -316,7 +316,7 @@
       task.onWatermarkProgress(Instant.ofEpochMilli(this.upstreamMinClock))
     }
 
-    val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) =>
+    val subMinClock = subscriptions.foldLeft(MAX_TIME_MILLIS) { (min, sub) =>
       val subMin = sub._2.minClock
       // A subscription is holding back the _minClock;
       // we send AckRequest to its tasks to push _minClock forward