[GEARPUMP-280] Set MAX_TIME_MILLIS to Long.MaxValue minus 1

Author: manuzhang <owenzhang1990@gmail.com>

Closes #159 from manuzhang/GEARPUMP-280.
diff --git a/core/src/main/scala/org/apache/gearpump/package.scala b/core/src/main/scala/org/apache/gearpump/package.scala
index 2f74ac4..6e20277 100644
--- a/core/src/main/scala/org/apache/gearpump/package.scala
+++ b/core/src/main/scala/org/apache/gearpump/package.scala
@@ -22,7 +22,7 @@
   type TimeStamp = Long
 
   // maximum time won't overflow when converted to milli-seconds
-  val MAX_TIME_MILLIS: Long = Long.MaxValue
+  val MAX_TIME_MILLIS: Long = Long.MaxValue - 1
 
   // minimum time won't overflow when converted to milli-seconds
   val MIN_TIME_MILLIS: Long = Long.MinValue
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index 23350dd..435414b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -113,7 +113,8 @@
 }
 
 object LifeTime {
-  val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS)
+  // MAX_TIME_MILLIS is Long.MaxValue - 1
+  val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS + 1)
 }
 
 /**
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
index 9c27bde..4371257 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -31,9 +31,7 @@
 
 object Watermark {
 
-  // maximum time won't overflow when converted to milli-seconds
   val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS)
 
-  // minimum time won't overflow when converted to milli-seconds
   val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS)
 }
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index 3635db9..9f02cef 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -24,7 +24,7 @@
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{FlatSpec, Matchers}
-import org.apache.gearpump.Message
+import org.apache.gearpump.{MAX_TIME_MILLIS, Message}
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask
@@ -119,12 +119,12 @@
   }
 
   it should "report minClock as Long.MaxValue when there is no pending message" in {
-    val (subscription, transport) = prepare
+    val (subscription, _) = prepare
     val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70))
     subscription.sendMessage(msg1)
     assert(subscription.minClock == 70)
     subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session))
-    assert(subscription.minClock == Long.MaxValue)
+    assert(subscription.minClock == MAX_TIME_MILLIS)
   }
 
   private def randomMessage: String = new Random().nextInt.toString