refactor WalLogAgg class.
  - separate edges and vertices.
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
index 8f21bc2..6d9f509 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
@@ -63,8 +63,7 @@
     val dfKeys = dfMap.keySet
 
     processes.filter{ p =>
-        var existAllInput = true
-        p.conf.inputs.foreach { input => existAllInput = dfKeys(input) }
+        val existAllInput = p.conf.inputs.forall { input => dfKeys(input) }
         !dfKeys(p.conf.name) && existAllInput
     }
     .map { p =>
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
index 9617ca1..4080045 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
@@ -1,7 +1,7 @@
 package org.apache.s2graph.s2jobs.wal
 
 import com.google.common.hash.Hashing
-import org.apache.s2graph.core.JSONParser
+import org.apache.s2graph.core.{GraphUtil, JSONParser}
 import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
 import org.apache.s2graph.s2jobs.wal.transformer.Transformer
 import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
@@ -12,10 +12,18 @@
 import scala.util.Try
 
 object WalLogAgg {
-  val outputColumns = Seq("from", "logs", "maxTs", "minTs")
+  val outputColumns = Seq("from", "vertices", "edges")
+
+  def isEdge(walLog: WalLog): Boolean = {
+    walLog.elem == "edge" || walLog.elem == "e"
+  }
 
   def apply(walLog: WalLog): WalLogAgg = {
-    new WalLogAgg(walLog.from, Seq(walLog), walLog.timestamp, walLog.timestamp)
+    val (vertices, edges) =
+      if (isEdge(walLog)) (Nil, Seq(walLog))
+      else (Seq(walLog), Nil)
+
+    new WalLogAgg(walLog.from, vertices, edges)
   }
 
   def toFeatureHash(dimVal: DimVal): Long = toFeatureHash(dimVal.dim, dimVal.value)
@@ -24,29 +32,43 @@
     Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong()
   }
 
-  def merge(iter: Iterator[WalLogAgg],
-            param: AggregateParam)(implicit ord: Ordering[WalLog]) = {
-    val heap = new BoundedPriorityQueue[WalLog](param.heapSize)
-    var minTs = Long.MaxValue
-    var maxTs = Long.MinValue
+  private def addToHeap(iter: Seq[WalLog],
+                        heap: BoundedPriorityQueue[WalLog],
+                        validTimestampDuration: Option[Long]): Unit = {
+    val now = System.currentTimeMillis()
 
-    iter.foreach { walLogAgg =>
-      minTs = Math.min(walLogAgg.minTs, minTs)
-      maxTs = Math.max(walLogAgg.maxTs, maxTs)
+    iter.foreach { walLog =>
+      val ts = walLog.timestamp
+      val isValid = validTimestampDuration.map(d => now - ts < d).getOrElse(true)
 
-      walLogAgg.logs.foreach { walLog =>
+      if (isValid) {
         heap += walLog
       }
     }
-    val topItems = if (param.sortTopItems) heap.toArray.sortBy(-_.timestamp) else heap.toArray
-
-    WalLogAgg(topItems.head.from, topItems, maxTs, minTs)
   }
 
-  def filterProps(walLogAgg: WalLogAgg,
-                  transformers: Seq[Transformer],
-                  validFeatureHashKeys: Set[Long]) = {
-    val filtered = walLogAgg.logs.map { walLog =>
+  def merge(iter: Iterator[WalLogAgg],
+            param: AggregateParam)(implicit ord: Ordering[WalLog]): Option[WalLogAgg] = {
+    val edgeHeap = new BoundedPriorityQueue[WalLog](param.heapSize)
+    val vertexHeap = new BoundedPriorityQueue[WalLog](param.heapSize)
+
+    val validTimestampDuration = param.validTimestampDuration
+
+    iter.foreach { walLogAgg =>
+      addToHeap(walLogAgg.vertices, vertexHeap, validTimestampDuration)
+      addToHeap(walLogAgg.edges, edgeHeap, validTimestampDuration)
+    }
+
+    val topVertices = if (param.sortTopItems) vertexHeap.toArray.sortBy(-_.timestamp) else vertexHeap.toArray
+    val topEdges = if (param.sortTopItems) edgeHeap.toArray.sortBy(-_.timestamp) else edgeHeap.toArray
+
+    topEdges.headOption.map(head => WalLogAgg(head.from, topVertices, topEdges))
+  }
+
+  private def filterPropsInner(walLogs: Seq[WalLog],
+                          transformers: Seq[Transformer],
+                          validFeatureHashKeys: Set[Long]): Seq[WalLog] = {
+    walLogs.map { walLog =>
       val fields = walLog.propsJson.fields.filter { case (propKey, propValue) =>
         val filtered = transformers.flatMap { transformer =>
           transformer.toDimValLs(walLog, propKey, JSONParser.jsValueToString(propValue)).filter(dimVal => validFeatureHashKeys(toFeatureHash(dimVal)))
@@ -56,8 +78,15 @@
 
       walLog.copy(props = Json.toJson(fields.toMap).as[JsObject].toString)
     }
+  }
 
-    walLogAgg.copy(logs = filtered)
+  def filterProps(walLogAgg: WalLogAgg,
+                        transformers: Seq[Transformer],
+                        validFeatureHashKeys: Set[Long]) = {
+    val filteredVertices = filterPropsInner(walLogAgg.vertices, transformers, validFeatureHashKeys)
+    val filteredEdges = filterPropsInner(walLogAgg.edges, transformers, validFeatureHashKeys)
+
+    walLogAgg.copy(vertices = filteredVertices, edges = filteredEdges)
   }
 }
 
@@ -88,9 +117,8 @@
 case class DimVal(dim: String, value: String)
 
 case class WalLogAgg(from: String,
-                     logs: Seq[WalLog],
-                     maxTs: Long,
-                     minTs: Long)
+                     vertices: Seq[WalLog],
+                     edges: Seq[WalLog])
 
 case class WalLog(timestamp: Long,
                   operation: String,
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
index ee4debd..aebb1cc 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
@@ -10,7 +10,7 @@
                 dataset: Dataset[WalLogAgg],
                 aggregateParam: AggregateParam)(implicit ord: Ordering[WalLog]) = {
     import ss.implicits._
-    dataset.groupByKey(_.from).mapGroups { case (_, iter) =>
+    dataset.groupByKey(_.from).flatMapGroups { case (_, iter) =>
       WalLogAgg.merge(iter, aggregateParam)
     }.toDF(WalLogAgg.outputColumns: _*)
   }
@@ -20,7 +20,7 @@
                    aggregateParam: AggregateParam)(implicit ord: Ordering[WalLog]): DataFrame = {
     import ss.implicits._
 
-    dataset.groupByKey(walLog => walLog.from).mapGroups { case (key, iter) =>
+    dataset.groupByKey(walLog => walLog.from).flatMapGroups { case (key, iter) =>
       WalLogAgg.merge(iter.map(WalLogAgg(_)), aggregateParam)
     }.toDF(WalLogAgg.outputColumns: _*)
   }
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
index 4cb1377..ecc12e3 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
@@ -14,12 +14,14 @@
     val arrayType = taskConf.options.get("arrayType").map(_.toBoolean).getOrElse(defaultIsArrayType)
     val sortTopItems = taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(defaultShouldSortTopItems)
     val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt)
+    val validTimestampDuration = taskConf.options.get("validTimestampDuration").map(_.toLong).getOrElse(Long.MaxValue)
 
     new AggregateParam(groupByKeys = groupByKeys,
       topK = Option(maxNumOfEdges),
       isArrayType = Option(arrayType),
       shouldSortTopItems = Option(sortTopItems),
-      numOfPartitions = numOfPartitions
+      numOfPartitions = numOfPartitions,
+      validTimestampDuration = Option(validTimestampDuration)
     )
   }
 }
@@ -28,7 +30,8 @@
                           topK: Option[Int],
                           isArrayType: Option[Boolean],
                           shouldSortTopItems: Option[Boolean],
-                          numOfPartitions: Option[Int]) {
+                          numOfPartitions: Option[Int],
+                          validTimestampDuration: Option[Long]) {
 
   import AggregateParam._