[GEARPUMP-274] Set lower bound of time to Long.MinValue milliseconds
Author: manuzhang <owenzhang1990@gmail.com>
Closes #151 from manuzhang/time.
diff --git a/core/src/main/scala/org/apache/gearpump/package.scala b/core/src/main/scala/org/apache/gearpump/package.scala
index b1118d3..2f74ac4 100644
--- a/core/src/main/scala/org/apache/gearpump/package.scala
+++ b/core/src/main/scala/org/apache/gearpump/package.scala
@@ -20,5 +20,10 @@
package object gearpump {
type TimeStamp = Long
- val LatestTime = -1
+
+ // maximum time won't overflow when converted to milli-seconds
+ val MAX_TIME_MILLIS: Long = Long.MaxValue
+
+ // minimum time won't overflow when converted to milli-seconds
+ val MIN_TIME_MILLIS: Long = Long.MinValue
}
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
index 1b239b5..fd023a9 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
@@ -28,7 +28,7 @@
import backtype.storm.tuple.Fields
import backtype.storm.utils.Utils
import org.slf4j.Logger
-import org.apache.gearpump._
+import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp}
import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
import org.apache.gearpump.experiments.storm.util.StormUtil._
import org.apache.gearpump.streaming.ProcessorId
@@ -56,7 +56,7 @@
streamGroupers, componentToProcessorId, values)
}
new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn,
- taskContext, LatestTime)
+ taskContext, MIN_TIME_MILLIS)
}
/**
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
index e0e9e61..6b894da 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
@@ -28,17 +28,17 @@
import org.scalatest.prop.PropertyChecks
import org.scalatest.{Matchers, PropSpec}
-import org.apache.gearpump._
+import org.apache.gearpump.{Message, MIN_TIME_MILLIS, TimeStamp}
import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
import org.apache.gearpump.streaming.MockUtil
class StormOutputCollectorSpec
extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
- val stormTaskId = 0
- val streamIdGen = Gen.alphaStr
- val valuesGen = Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]])
- val timestampGen = Gen.chooseNum[Long](0L, 1000L)
+ private val stormTaskId = 0
+ private val streamIdGen = Gen.alphaStr
+ private val valuesGen = Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]])
+ private val timestampGen = Gen.chooseNum[Long](0L, 1000L)
property("StormOutputCollector emits tuple values into a stream") {
forAll(timestampGen, streamIdGen, valuesGen) {
@@ -53,7 +53,7 @@
targetStormTaskIds))
val taskContext = MockUtil.mockTaskContext
val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
- targets, getTargetPartitionsFn, taskContext, LatestTime)
+ targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS)
when(targets.containsKey(streamId)).thenReturn(false)
stormOutputCollector.emit(streamId, values) shouldBe StormOutputCollector.EMPTY_LIST
@@ -86,7 +86,7 @@
targetStormTaskIds))
val taskContext = MockUtil.mockTaskContext
val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
- targets, getTargetPartitionsFn, taskContext, LatestTime)
+ targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS)
when(targets.containsKey(streamId)).thenReturn(false)
verify(taskContext, times(0)).output(anyObject[Message])
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index d4b3719..23350dd 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -20,10 +20,8 @@
import scala.language.implicitConversions
import scala.reflect.ClassTag
-
import akka.actor.ActorSystem
-
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, TimeStamp}
import org.apache.gearpump.cluster._
import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject}
import org.apache.gearpump.streaming.appmaster.AppMaster
@@ -115,7 +113,7 @@
}
object LifeTime {
- val Immortal = LifeTime(0L, Long.MaxValue)
+ val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS)
}
/**
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
index 0a2999d..2a24b66 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
@@ -25,7 +25,7 @@
import akka.actor.{Actor, ActorRef, Cancellable, Stash}
import com.google.common.primitives.Longs
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.{MIN_TIME_MILLIS, TimeStamp}
import org.apache.gearpump.cluster.ClientToMaster.GetStallingTasks
import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks
import org.apache.gearpump.streaming._
@@ -53,8 +53,8 @@
import context.dispatcher
private val healthChecker = new HealthChecker(stallingThresholdSeconds = 60)
- private var healthCheckScheduler: Cancellable = null
- private var snapshotScheduler: Cancellable = null
+ private var healthCheckScheduler: Cancellable = _
+ private var snapshotScheduler: Cancellable = _
override def receive: Receive = null
@@ -88,7 +88,7 @@
// We use Array instead of List for Performance consideration
private var processorClocks = Array.empty[ProcessorClock]
- private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = null
+ private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = _
private var minCheckpointClock: Option[TimeStamp] = None
@@ -337,7 +337,8 @@
case object HealthCheck
class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallelism: Int,
- private var _min: TimeStamp = 0L, private var _taskClocks: Array[TimeStamp] = null) {
+ private var _min: TimeStamp = MIN_TIME_MILLIS,
+ private var _taskClocks: Array[TimeStamp] = null) {
def copy(life: LifeTime): ProcessorClock = {
new ProcessorClock(processorId, life, parallelism, _min, _taskClocks)
@@ -370,7 +371,7 @@
class HealthChecker(stallingThresholdSeconds: Int) {
private val LOG: Logger = LogUtil.getLogger(getClass)
- private var minClock: ClockValue = null
+ private var minClock: ClockValue = _
private val stallingThresholdMilliseconds = stallingThresholdSeconds * 1000
// 60 seconds
private var stallingTasks = Array.empty[TaskId]
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
index 1f8d3a1..9c27bde 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -20,6 +20,7 @@
import java.time.Instant
import org.apache.gearpump.Message
+import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS}
/**
* message used by source task to report source watermark.
@@ -31,5 +32,8 @@
object Watermark {
// maximum time won't overflow when converted to milli-seconds
- val MAX = Instant.ofEpochMilli(Long.MaxValue)
+ val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS)
+
+ // minimum time won't overflow when converted to milli-seconds
+ val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS)
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index 3e68580..c6817f5 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -26,7 +26,7 @@
import org.apache.gearpump.streaming.LifeTime
import org.apache.gearpump.streaming.task.Subscription._
import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.{MAX_TIME_MILLIS, Message, MIN_TIME_MILLIS, TimeStamp}
/**
* Manges the output and message clock for single downstream processor
@@ -58,8 +58,8 @@
private val pendingMessageCount: Array[Short] = new Array[Short](parallelism)
private val candidateMinClockSince: Array[Short] = new Array[Short](parallelism)
- private val minClockValue: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue)
- private val candidateMinClock: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue)
+ private val minClockValue: Array[TimeStamp] = Array.fill(parallelism)(MAX_TIME_MILLIS)
+ private val candidateMinClock: Array[TimeStamp] = Array.fill(parallelism)(MAX_TIME_MILLIS)
private var maxPendingCount: Short = 0
@@ -133,7 +133,7 @@
}
}
- private var lastFlushTime: Long = 0L
+ private var lastFlushTime: Long = MIN_TIME_MILLIS
private val FLUSH_INTERVAL = 5 * 1000 // ms
private def needFlush: Boolean = {
System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL &&
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index 14c2b59..318ebf8 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -33,7 +33,7 @@
import org.apache.gearpump.streaming.ExecutorToAppMaster._
import org.apache.gearpump.streaming.ProcessorId
import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.{MAX_TIME_MILLIS, Message, MIN_TIME_MILLIS, TimeStamp}
/**
*
@@ -46,8 +46,8 @@
val task: TaskWrapper,
inputSerializerPool: SerializationFramework)
extends Actor with ExpressTransport with TimeOutScheduler {
- var upstreamMinClock: TimeStamp = 0L
- private var _minClock: TimeStamp = 0L
+ private var upstreamMinClock: TimeStamp = MIN_TIME_MILLIS
+ private var _minClock: TimeStamp = MIN_TIME_MILLIS
private var minClockReported: Boolean = true
def serializerPool: SerializationFramework = inputSerializerPool
@@ -246,7 +246,7 @@
receiveMessage(watermark.toMessage, sender)
- case upstream@UpstreamMinClock(upstreamClock) =>
+ case UpstreamMinClock(upstreamClock) =>
updateUpstreamMinClock(upstreamClock)
case ChangeTask(_, dagVersion, life, subscribers) =>
@@ -316,7 +316,7 @@
task.onWatermarkProgress(Instant.ofEpochMilli(this.upstreamMinClock))
}
- val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) =>
+ val subMinClock = subscriptions.foldLeft(MAX_TIME_MILLIS) { (min, sub) =>
val subMin = sub._2.minClock
// A subscription is holding back the _minClock;
// we send AckRequest to its tasks to push _minClock forward