[GEARPUMP-280] Set MAX_TIME_MILLIS to Long.MaxValue minus 1
Author: manuzhang <owenzhang1990@gmail.com>
Closes #159 from manuzhang/GEARPUMP-280.
diff --git a/core/src/main/scala/org/apache/gearpump/package.scala b/core/src/main/scala/org/apache/gearpump/package.scala
index 2f74ac4..6e20277 100644
--- a/core/src/main/scala/org/apache/gearpump/package.scala
+++ b/core/src/main/scala/org/apache/gearpump/package.scala
@@ -22,7 +22,7 @@
type TimeStamp = Long
// maximum time won't overflow when converted to milli-seconds
- val MAX_TIME_MILLIS: Long = Long.MaxValue
+ val MAX_TIME_MILLIS: Long = Long.MaxValue - 1
// minimum time won't overflow when converted to milli-seconds
val MIN_TIME_MILLIS: Long = Long.MinValue
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index 23350dd..435414b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -113,7 +113,8 @@
}
object LifeTime {
- val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS)
+ // MAX_TIME_MILLIS is Long.MaxValue - 1
+ val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS + 1)
}
/**
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
index 9c27bde..4371257 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -31,9 +31,7 @@
object Watermark {
- // maximum time won't overflow when converted to milli-seconds
val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS)
- // minimum time won't overflow when converted to milli-seconds
val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS)
}
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index 3635db9..9f02cef 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -24,7 +24,7 @@
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{FlatSpec, Matchers}
-import org.apache.gearpump.Message
+import org.apache.gearpump.{MAX_TIME_MILLIS, Message}
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask
@@ -119,12 +119,12 @@
}
it should "report minClock as Long.MaxValue when there is no pending message" in {
- val (subscription, transport) = prepare
+ val (subscription, _) = prepare
val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70))
subscription.sendMessage(msg1)
assert(subscription.minClock == 70)
subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session))
- assert(subscription.minClock == Long.MaxValue)
+ assert(subscription.minClock == MAX_TIME_MILLIS)
}
private def randomMessage: String = new Random().nextInt.toString