[GEARPUMP-23] Add SessionWindows

Author: manuzhang <owenzhang1990@gmail.com>

Closes #140 from manuzhang/sessions.
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
index 9ef171d..73fef5d 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
@@ -33,11 +33,19 @@
 }
 
 trait WindowFunction[T] {
+
   def apply(context: WindowFunction.Context[T]): Array[Window]
+
+  def isNonMerging: Boolean
+}
+
+abstract class NonMergingWindowFunction[T] extends WindowFunction[T] {
+
+  override def isNonMerging: Boolean = true
 }
 
 case class SlidingWindowFunction[T](size: Duration, step: Duration)
-  extends WindowFunction[T] {
+  extends NonMergingWindowFunction[T] {
 
   def this(size: Duration) = {
     this(size, size)
@@ -64,9 +72,19 @@
   }
 }
 
-case class CountWindowFunction[T](size: Int) extends WindowFunction[T] {
+case class CountWindowFunction[T](size: Int) extends NonMergingWindowFunction[T] {
 
   override def apply(context: WindowFunction.Context[T]): Array[Window] = {
     Array(Window.ofEpochMilli(0, size))
   }
 }
+
+case class SessionWindowFunction[T](gap: Duration) extends WindowFunction[T] {
+
+  override def apply(context: WindowFunction.Context[T]): Array[Window] = {
+    Array(Window(context.timestamp, context.timestamp.plus(gap)))
+  }
+
+  override def isNonMerging: Boolean = false
+
+}
\ No newline at end of file
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
index c636a55..5917f09 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
@@ -21,9 +21,11 @@
 
 /**
  *
- * @param windowFn
- * @param trigger
- * @param accumulationMode
+ * Defines how to apply window functions.
+ *
+ * @param windowFn how to divide windows
+ * @param trigger when to trigger window result
+ * @param accumulationMode whether to accumulate results across windows
  */
 case class Windows[T](
     windowFn: WindowFunction[T],
@@ -54,6 +56,7 @@
 
   /**
    * Defines a FixedWindow.
+   *
    * @param size window size
    * @return a Window definition
    */
@@ -62,10 +65,11 @@
   }
 }
 
-object SlidingWindow {
+object SlidingWindows {
 
   /**
-   * Defines a SlidingWindow
+   * Defines a SlidingWindow.
+   *
    * @param size window size
    * @param step window step to slide forward
    * @return a Window definition
@@ -75,3 +79,16 @@
   }
 }
 
+object SessionWindows {
+
+  /**
+   * Defines a SessionWindow.
+   *
+   * @param gap session gap
+   * @return a Window definition
+   */
+  def apply[T](gap: Duration): Windows[T] = {
+    Windows(SessionWindowFunction(gap))
+  }
+}
+
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index fe644af..3e9d8fb 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -39,6 +39,22 @@
  */
 case class Window(startTime: Instant, endTime: Instant) extends Comparable[Window] {
 
+  /**
+   * Returns whether this window intersects the given window.
+   */
+  def intersects(other: Window): Boolean = {
+    startTime.isBefore(other.endTime) && endTime.isAfter(other.startTime)
+  }
+
+  /**
+   * Returns the minimal window that includes both this window and
+   * the given window.
+   */
+  def span(other: Window): Window = {
+    Window(Instant.ofEpochMilli(Math.min(startTime.toEpochMilli, other.startTime.toEpochMilli)),
+      Instant.ofEpochMilli(Math.max(endTime.toEpochMilli, other.endTime.toEpochMilli)))
+  }
+
   override def compareTo(o: Window): Int = {
     val ret = startTime.compareTo(o.startTime)
     if (ret != 0) {
@@ -52,14 +68,16 @@
 case class WindowAndGroup[GROUP](window: Window, group: GROUP)
   extends Comparable[WindowAndGroup[GROUP]] {
 
+  def intersects(other: WindowAndGroup[GROUP]): Boolean = {
+    window.intersects(other.window) && group.equals(other.group)
+  }
+
   override def compareTo(o: WindowAndGroup[GROUP]): Int = {
     val ret = window.compareTo(o.window)
     if (ret != 0) {
       ret
-    } else if (group.equals(o.group)) {
-      0
     } else {
-      -1
+      group.hashCode() - o.group.hashCode()
     }
   }
 }
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 7a16100..7013645 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -20,6 +20,7 @@
 import java.time.Instant
 
 import akka.actor.ActorSystem
+import com.gs.collections.api.block.predicate.Predicate
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import com.gs.collections.api.block.procedure.Procedure
@@ -33,6 +34,7 @@
 import org.apache.gearpump.util.LogUtil
 import org.slf4j.Logger
 
+
 trait WindowRunner {
 
   def process(message: Message): Unit
@@ -43,8 +45,6 @@
 object DefaultWindowRunner {
 
   private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
-
-  case class InputsAndFn[IN, OUT](inputs: FastList[IN], fn: FunctionRunner[IN, OUT])
 }
 
 class DefaultWindowRunner[IN, GROUP, OUT](
@@ -52,17 +52,24 @@
     groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
   extends WindowRunner {
 
+  private val windowFn = groupBy.window.windowFn
   private val groupedInputs = new TreeSortedMap[WindowAndGroup[GROUP], FastList[IN]]
   private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]]
 
   override def process(message: Message): Unit = {
+    val input = message.msg.asInstanceOf[IN]
     val wgs = groupBy.groupBy(message)
     wgs.foreach { wg =>
-      if (!groupedInputs.containsKey(wg)) {
-        val inputs = new FastList[IN](1)
-        groupedInputs.put(wg, inputs)
+      if (windowFn.isNonMerging) {
+        if (!groupedInputs.containsKey(wg)) {
+          val inputs = new FastList[IN](1)
+          groupedInputs.put(wg, inputs)
+        }
+        groupedInputs.get(wg).add(input)
+      } else {
+        merge(wg, input)
       }
-      groupedInputs.get(wg).add(message.msg.asInstanceOf[IN])
+
       if (!groupedFnRunners.containsKey(wg.group)) {
         val fn = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
         fn.setup()
@@ -70,6 +77,23 @@
       }
     }
 
+    def merge(wg: WindowAndGroup[GROUP], input: IN): Unit = {
+      val intersected = groupedInputs.keySet.select(new Predicate[WindowAndGroup[GROUP]] {
+        override def accept(each: WindowAndGroup[GROUP]): Boolean = {
+          wg.intersects(each)
+        }
+      })
+      var mergedWin = wg.window
+      val mergedInputs = FastList.newListWith(input)
+      intersected.forEach(new Procedure[WindowAndGroup[GROUP]] {
+        override def value(each: WindowAndGroup[GROUP]): Unit = {
+          mergedWin = mergedWin.span(each.window)
+          mergedInputs.addAll(groupedInputs.remove(each))
+        }
+      })
+      groupedInputs.put(WindowAndGroup(mergedWin, wg.group), mergedInputs)
+    }
+
   }
 
   override def trigger(time: Instant): Unit = {
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
index 07b5544..9414c76 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
@@ -22,7 +22,7 @@
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, SlidingWindow}
+import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, SlidingWindows}
 import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
@@ -43,7 +43,7 @@
     forAll(windowSizeGen, windowStepGen, watermarkGen) {
       (windowSize: Long, windowStep: Long, watermark: Instant) =>
 
-        val window = SlidingWindow.apply[Any](Duration.ofMillis(windowSize),
+        val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize),
           Duration.ofMillis(windowStep)).triggering(EventTimeTrigger)
         val groupBy = mock[GroupAlsoByWindow[Any, Any]]
         val windowRunner = mock[WindowRunner]
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
index ef51ab2..cbc9e0c 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
@@ -23,7 +23,7 @@
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
-import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, SlidingWindow}
+import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, SlidingWindows}
 import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
@@ -44,7 +44,7 @@
     forAll(windowSizeGen, windowStepGen, startTimeGen) {
       (windowSize: Long, windowStep: Long, startTime: Instant) =>
 
-        val window = SlidingWindow.apply[Any](Duration.ofMillis(windowSize),
+        val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize),
           Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger)
         val groupBy = mock[GroupAlsoByWindow[Any, Any]]
         val windowRunner = mock[WindowRunner]