[GEARPUMP-23] Add SessionWindows
Author: manuzhang <owenzhang1990@gmail.com>
Closes #140 from manuzhang/sessions.
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
index 9ef171d..73fef5d 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
@@ -33,11 +33,19 @@
}
trait WindowFunction[T] {
+
def apply(context: WindowFunction.Context[T]): Array[Window]
+
+ def isNonMerging: Boolean
+}
+
+abstract class NonMergingWindowFunction[T] extends WindowFunction[T] {
+
+ override def isNonMerging: Boolean = true
}
case class SlidingWindowFunction[T](size: Duration, step: Duration)
- extends WindowFunction[T] {
+ extends NonMergingWindowFunction[T] {
def this(size: Duration) = {
this(size, size)
@@ -64,9 +72,19 @@
}
}
-case class CountWindowFunction[T](size: Int) extends WindowFunction[T] {
+case class CountWindowFunction[T](size: Int) extends NonMergingWindowFunction[T] {
override def apply(context: WindowFunction.Context[T]): Array[Window] = {
Array(Window.ofEpochMilli(0, size))
}
}
+
+case class SessionWindowFunction[T](gap: Duration) extends WindowFunction[T] {
+
+ override def apply(context: WindowFunction.Context[T]): Array[Window] = {
+ Array(Window(context.timestamp, context.timestamp.plus(gap)))
+ }
+
+ override def isNonMerging: Boolean = false
+
+}
\ No newline at end of file
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
index c636a55..5917f09 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
@@ -21,9 +21,11 @@
/**
*
- * @param windowFn
- * @param trigger
- * @param accumulationMode
+ * Defines how to apply window functions.
+ *
+ * @param windowFn how to divide windows
+ * @param trigger when to trigger window result
+ * @param accumulationMode whether to accumulate results across windows
*/
case class Windows[T](
windowFn: WindowFunction[T],
@@ -54,6 +56,7 @@
/**
* Defines a FixedWindow.
+ *
* @param size window size
* @return a Window definition
*/
@@ -62,10 +65,11 @@
}
}
-object SlidingWindow {
+object SlidingWindows {
/**
- * Defines a SlidingWindow
+ * Defines a SlidingWindow.
+ *
* @param size window size
* @param step window step to slide forward
* @return a Window definition
@@ -75,3 +79,16 @@
}
}
+object SessionWindows {
+
+ /**
+ * Defines a SessionWindow.
+ *
+ * @param gap session gap
+ * @return a Window definition
+ */
+ def apply[T](gap: Duration): Windows[T] = {
+ Windows(SessionWindowFunction(gap))
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index fe644af..3e9d8fb 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -39,6 +39,22 @@
*/
case class Window(startTime: Instant, endTime: Instant) extends Comparable[Window] {
+ /**
+ * Returns whether this window intersects the given window.
+ */
+ def intersects(other: Window): Boolean = {
+ startTime.isBefore(other.endTime) && endTime.isAfter(other.startTime)
+ }
+
+ /**
+ * Returns the minimal window that includes both this window and
+ * the given window.
+ */
+ def span(other: Window): Window = {
+ Window(Instant.ofEpochMilli(Math.min(startTime.toEpochMilli, other.startTime.toEpochMilli)),
+ Instant.ofEpochMilli(Math.max(endTime.toEpochMilli, other.endTime.toEpochMilli)))
+ }
+
override def compareTo(o: Window): Int = {
val ret = startTime.compareTo(o.startTime)
if (ret != 0) {
@@ -52,14 +68,16 @@
case class WindowAndGroup[GROUP](window: Window, group: GROUP)
extends Comparable[WindowAndGroup[GROUP]] {
+ def intersects(other: WindowAndGroup[GROUP]): Boolean = {
+ window.intersects(other.window) && group.equals(other.group)
+ }
+
override def compareTo(o: WindowAndGroup[GROUP]): Int = {
val ret = window.compareTo(o.window)
if (ret != 0) {
ret
- } else if (group.equals(o.group)) {
- 0
} else {
- -1
+ group.hashCode() - o.group.hashCode()
}
}
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 7a16100..7013645 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -20,6 +20,7 @@
import java.time.Instant
import akka.actor.ActorSystem
+import com.gs.collections.api.block.predicate.Predicate
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import com.gs.collections.api.block.procedure.Procedure
@@ -33,6 +34,7 @@
import org.apache.gearpump.util.LogUtil
import org.slf4j.Logger
+
trait WindowRunner {
def process(message: Message): Unit
@@ -43,8 +45,6 @@
object DefaultWindowRunner {
private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
-
- case class InputsAndFn[IN, OUT](inputs: FastList[IN], fn: FunctionRunner[IN, OUT])
}
class DefaultWindowRunner[IN, GROUP, OUT](
@@ -52,17 +52,24 @@
groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
extends WindowRunner {
+ private val windowFn = groupBy.window.windowFn
private val groupedInputs = new TreeSortedMap[WindowAndGroup[GROUP], FastList[IN]]
private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]]
override def process(message: Message): Unit = {
+ val input = message.msg.asInstanceOf[IN]
val wgs = groupBy.groupBy(message)
wgs.foreach { wg =>
- if (!groupedInputs.containsKey(wg)) {
- val inputs = new FastList[IN](1)
- groupedInputs.put(wg, inputs)
+ if (windowFn.isNonMerging) {
+ if (!groupedInputs.containsKey(wg)) {
+ val inputs = new FastList[IN](1)
+ groupedInputs.put(wg, inputs)
+ }
+ groupedInputs.get(wg).add(input)
+ } else {
+ merge(wg, input)
}
- groupedInputs.get(wg).add(message.msg.asInstanceOf[IN])
+
if (!groupedFnRunners.containsKey(wg.group)) {
val fn = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
fn.setup()
@@ -70,6 +77,23 @@
}
}
+ def merge(wg: WindowAndGroup[GROUP], input: IN): Unit = {
+ val intersected = groupedInputs.keySet.select(new Predicate[WindowAndGroup[GROUP]] {
+ override def accept(each: WindowAndGroup[GROUP]): Boolean = {
+ wg.intersects(each)
+ }
+ })
+ var mergedWin = wg.window
+ val mergedInputs = FastList.newListWith(input)
+ intersected.forEach(new Procedure[WindowAndGroup[GROUP]] {
+ override def value(each: WindowAndGroup[GROUP]): Unit = {
+ mergedWin = mergedWin.span(each.window)
+ mergedInputs.addAll(groupedInputs.remove(each))
+ }
+ })
+ groupedInputs.put(WindowAndGroup(mergedWin, wg.group), mergedInputs)
+ }
+
}
override def trigger(time: Instant): Unit = {
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
index 07b5544..9414c76 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
@@ -22,7 +22,7 @@
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, SlidingWindow}
+import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, SlidingWindows}
import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
import org.mockito.Matchers._
import org.mockito.Mockito._
@@ -43,7 +43,7 @@
forAll(windowSizeGen, windowStepGen, watermarkGen) {
(windowSize: Long, windowStep: Long, watermark: Instant) =>
- val window = SlidingWindow.apply[Any](Duration.ofMillis(windowSize),
+ val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize),
Duration.ofMillis(windowStep)).triggering(EventTimeTrigger)
val groupBy = mock[GroupAlsoByWindow[Any, Any]]
val windowRunner = mock[WindowRunner]
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
index ef51ab2..cbc9e0c 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
@@ -23,7 +23,7 @@
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
-import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, SlidingWindow}
+import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, SlidingWindows}
import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
import org.mockito.Matchers._
import org.mockito.Mockito._
@@ -44,7 +44,7 @@
forAll(windowSizeGen, windowStepGen, startTimeGen) {
(windowSize: Long, windowStep: Long, startTime: Instant) =>
- val window = SlidingWindow.apply[Any](Duration.ofMillis(windowSize),
+ val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize),
Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger)
val groupBy = mock[GroupAlsoByWindow[Any, Any]]
val windowRunner = mock[WindowRunner]