add wal package, WalLog class, UserDefinedAggregateFunction.
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
index 58d3368..1c47f13 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
@@ -22,10 +22,19 @@
 import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
 
 object Schema {
-  val BulkLoadSchema = StructType(Seq(
-    StructField("timestamp", LongType, false),
-    StructField("operation", StringType, false),
-    StructField("elem", StringType, false),
+  /**
+    * root
+    * |-- timestamp: long (nullable = false)
+    * |-- operation: string (nullable = false)
+    * |-- elem: string (nullable = false)
+    */
+  val CommonFields = Seq(
+    StructField("timestamp", LongType, nullable = false),
+    StructField("operation", StringType, nullable = false),
+    StructField("elem", StringType, nullable = false)
+  )
+
+  val BulkLoadSchema = StructType(CommonFields ++ Seq(
     StructField("from", StringType, false),
     StructField("to", StringType, false),
     StructField("label", StringType, false),
@@ -33,24 +42,63 @@
     StructField("direction", StringType, true)
   ))
 
-  val VertexSchema = StructType(Seq(
-    StructField("timestamp", LongType, false),
-    StructField("operation", StringType, false),
-    StructField("elem", StringType, false),
+  /**
+    * root
+    * |-- timestamp: long (nullable = true)
+    * |-- operation: string (nullable = true)
+    * |-- elem: string (nullable = true)
+    * |-- id: string (nullable = true)
+    * |-- service: string (nullable = true)
+    * |-- column: string (nullable = true)
+    * |-- props: string (nullable = true)
+    */
+  val VertexSchema = StructType(CommonFields ++ Seq(
     StructField("id", StringType, false),
     StructField("service", StringType, false),
     StructField("column", StringType, false),
     StructField("props", StringType, false)
   ))
 
-  val EdgeSchema = StructType(Seq(
-    StructField("timestamp", LongType, false),
-    StructField("operation", StringType, false),
-    StructField("elem", StringType, false),
+
+  /**
+    * root
+    * |-- timestamp: long (nullable = true)
+    * |-- operation: string (nullable = true)
+    * |-- elem: string (nullable = true)
+    * |-- from: string (nullable = true)
+    * |-- to: string (nullable = true)
+    * |-- label: string (nullable = true)
+    * |-- props: string (nullable = true)
+    * |-- direction: string (nullable = true)
+    */
+  val EdgeSchema = StructType(CommonFields ++ Seq(
     StructField("from", StringType, false),
     StructField("to", StringType, false),
     StructField("label", StringType, false),
     StructField("props", StringType, false),
     StructField("direction", StringType, true)
   ))
+
+  /**
+    * root
+    * |-- timestamp: long (nullable = false)
+    * |-- operation: string (nullable = false)
+    * |-- elem: string (nullable = false)
+    * |-- id: string (nullable = true)
+    * |-- service: string (nullable = true)
+    * |-- column: string (nullable = true)
+    * |-- from: string (nullable = true)
+    * |-- to: string (nullable = true)
+    * |-- label: string (nullable = true)
+    * |-- props: string (nullable = true)
+    */
+  val GraphElementSchema = StructType(CommonFields ++ Seq(
+    StructField("id", StringType, nullable = true),
+    StructField("service", StringType, nullable = true),
+    StructField("column", StringType, nullable = true),
+    StructField("from", StringType, nullable = true),
+    StructField("to", StringType, nullable = true),
+    StructField("label", StringType, nullable = true),
+    StructField("props", StringType, nullable = true)
+  ))
 }
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index bfac62b..5bbf166 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -19,12 +19,13 @@
 
 package org.apache.s2graph.s2jobs.task
 
-import org.apache.s2graph.core.Management
+import org.apache.s2graph.core.{JSONParser, Management}
 import org.apache.s2graph.s2jobs.Schema
 import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer}
 import org.apache.s2graph.s2jobs.serde.reader.S2GraphCellReader
 import org.apache.s2graph.s2jobs.serde.writer.RowDataFrameWriter
 import org.apache.spark.sql.{DataFrame, SparkSession}
+import play.api.libs.json.{JsObject, Json}
 
 
 /**
@@ -103,8 +104,9 @@
       case "edgeLog" =>
         ss.read.format("com.databricks.spark.csv").option("delimiter", "\t")
           .schema(BulkLoadSchema).load(paths: _*)
-      case _ => ss.read.format(format).load(paths: _*)
+      case _ =>
         val df = ss.read.format(format).load(paths: _*)
+
         if (columnsOpt.isDefined) df.toDF(columnsOpt.get.split(",").map(_.trim): _*) else df
     }
   }
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
new file mode 100644
index 0000000..1b70a8a
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
@@ -0,0 +1,31 @@
+package org.apache.s2graph.s2jobs.wal
+
+import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
+
+case class WalLog(timestamp:Long,
+                  operation:String,
+                  elem:String,
+                  from:String,
+                  to:String,
+                  service:String,
+                  label:String,
+                  props:String) {
+  val id = from
+  val columnName = label
+  val serviceName = to
+}
+
+object WalLog {
+  val WalLogSchema = StructType(Seq(
+    StructField("timestamp", LongType, false),
+    StructField("operation", StringType, false),
+    StructField("elem", StringType, false),
+    StructField("from", StringType, false),
+    StructField("to", StringType, false),
+    StructField("service", StringType, true),
+    StructField("label", StringType, false),
+    StructField("props", StringType, false)
+    //    StructField("direction", StringType, true)
+  ))
+}
+
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala
new file mode 100644
index 0000000..a1d17f1
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala
@@ -0,0 +1,33 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.udafs._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+/**
+  * expect S2EdgeData dataframe as input.
+  * @param taskConf
+  */
+class S2EdgeDataAggregateProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) {
+  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = {
+    val maxNumOfEdges = taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000)
+    val groupByColumns = taskConf.options.get("groupByColumns").getOrElse("from").split(",").map(col(_))
+    val aggregateColumns = taskConf.options.get("aggregateColumns").getOrElse("timestamp,to,label,props").split(",").map(col(_))
+    taskConf.options.get("parallelism").map(ss.sqlContext.setConf("spark.sql.shuffle.partitions", _))
+
+    val aggregator = new GroupByAgg(maxNumOfEdges)
+
+    val edges = inputMap(taskConf.inputs.head)
+
+    edges
+      .groupBy(groupByColumns: _*)
+      .agg(
+        aggregator(aggregateColumns: _*).as("edges"),
+        max(col("timestamp")).as("max_ts"),
+        min(col("timestamp")).as("min_ts")
+      )
+  }
+
+  override def mandatoryOptions: Set[String] = Set.empty
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala
new file mode 100644
index 0000000..08df269
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala
@@ -0,0 +1,29 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.udafs.GroupByArrayAgg
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.functions._
+
+class S2EdgeDataArrayAggregateProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) {
+  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = {
+    import ss.sqlContext.implicits._
+    val maxNumOfEdges = taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000)
+    val groupByColumns = taskConf.options.get("groupByColumns").getOrElse("from").split(",").map(col(_))
+    val aggregateColumns = taskConf.options.get("aggregateColumns").getOrElse("edges").split(",").map(col(_))
+    taskConf.options.get("parallelism").map(ss.sqlContext.setConf("spark.sql.shuffle.partitions", _))
+    val aggregator = new GroupByArrayAgg(maxNumOfEdges)
+
+    val edges = inputMap(taskConf.inputs.head)
+
+    edges
+      .groupBy(groupByColumns: _*)
+      .agg(
+        aggregator(aggregateColumns: _*).as("edges"),
+        max(col("max_ts")).as("max_ts"),
+        min(col("min_ts")).as("min_ts")
+      )
+  }
+
+  override def mandatoryOptions: Set[String] = Set.empty
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
new file mode 100644
index 0000000..1b12235
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
@@ -0,0 +1,166 @@
+package org.apache.s2graph.s2jobs.wal.udafs
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+
+object S2EdgeDataAggregate {
+  type Element = (Long, String, String, String)
+
+  val emptyRow = new GenericRow(Array(-1L, "empty", "empty", "empty"))
+
+  val elementOrd = Ordering.by[Element, Long](_._1)
+
+  val rowOrdering = new Ordering[Row] {
+    override def compare(x: Row, y: Row): Int = {
+      x.getAs[Long](0).compareTo(y.getAs[Long](0))
+    }
+  }
+
+  val rowOrderingDesc = new Ordering[Row] {
+    override def compare(x: Row, y: Row): Int = {
+      -x.getAs[Long](0).compareTo(y.getAs[Long](0))
+    }
+  }
+
+  val fields = Seq(
+    StructField(name = "timestamp", LongType),
+    StructField(name = "to", StringType),
+    StructField(name = "label", StringType),
+    StructField(name = "props", StringType)
+  )
+
+  val arrayType = ArrayType(elementType = StructType(fields))
+
+  def apply(maxNumOfEdges: Int = 1000): GroupByAgg = {
+    new GroupByAgg(maxNumOfEdges)
+  }
+
+  def mergeTwoSeq[T](prev: Seq[T], cur: Seq[T], size: Int)(implicit ordering: Ordering[T]): Seq[T] = {
+    import scala.collection.mutable
+    val (n, m) = (cur.size, prev.size)
+
+    var (i, j) = (0, 0)
+    var idx = 0
+    val arr = new mutable.ArrayBuffer[T](size)
+
+    while (idx < size && i < n && j < m) {
+      if (ordering.compare(cur(i), prev(j)) > 0) {
+        arr += cur(i)
+        i += 1
+      } else {
+        arr += prev(j)
+        j += 1
+      }
+      idx += 1
+    }
+    while (idx < size && i < n) {
+      arr += cur(i)
+      i += 1
+    }
+    while (idx < size && j < m) {
+      arr += prev(j)
+      j += 1
+    }
+
+    arr
+  }
+}
+
+class GroupByAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction {
+
+  import S2EdgeDataAggregate._
+
+  implicit val ord = rowOrderingDesc
+
+  val arrayType = ArrayType(elementType = StructType(fields))
+
+  override def inputSchema: StructType = StructType(fields)
+
+  override def bufferSchema: StructType = StructType(Seq(
+    StructField(name = "edges", dataType = arrayType),
+    StructField(name = "buffered", dataType = BooleanType)
+  ))
+
+  override def dataType: DataType = arrayType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, scala.collection.mutable.ListBuffer.empty[Element])
+  }
+
+  /* not optimized */
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val element = input
+
+    val prev = buffer.getAs[Seq[Row]](0)
+    val appended = prev :+ element
+
+    buffer.update(0, appended)
+    buffer.update(1, false)
+  }
+
+  private def takeTopK(ls: Seq[Row], k: Int) = {
+    val sorted = ls.sorted
+    if (sorted.size <= k) sorted else sorted.take(k)
+  }
+
+  /* not optimized */
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    val cur = buffer2.getAs[Seq[Row]](0)
+    val prev = buffer1.getAs[Seq[Row]](0)
+
+    buffer1.update(0, takeTopK(prev ++ cur, maxNumOfEdges))
+    buffer1.update(1, true)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    val ls = buffer.getAs[Seq[Row]](0)
+    val buffered = buffer.getAs[Boolean](1)
+    if (buffered) ls
+    else takeTopK(ls, maxNumOfEdges)
+  }
+}
+
+class GroupByArrayAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction {
+
+  import S2EdgeDataAggregate._
+
+  implicit val ord = rowOrdering
+
+  import scala.collection.mutable
+
+  override def inputSchema: StructType = StructType(Seq(
+    StructField(name = "edges", dataType = arrayType)
+  ))
+
+  override def bufferSchema: StructType = StructType(Seq(
+    StructField(name = "edges", dataType = arrayType)
+  ))
+
+  override def dataType: DataType = arrayType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit =
+    buffer.update(0, mutable.ListBuffer.empty[Row])
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val cur = input.getAs[Seq[Row]](0)
+    val prev = buffer.getAs[Seq[Row]](0)
+    val merged = mergeTwoSeq(cur, prev, maxNumOfEdges)
+
+    buffer.update(0, merged)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    val cur = buffer2.getAs[Seq[Row]](0)
+    val prev = buffer1.getAs[Seq[Row]](0)
+
+    buffer1.update(0, mergeTwoSeq(cur, prev, maxNumOfEdges))
+  }
+
+  override def evaluate(buffer: Row): Any = buffer.getAs[Seq[Row]](0)
+}
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala
new file mode 100644
index 0000000..72c8cf5
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala
@@ -0,0 +1,87 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.WalLog
+import org.apache.s2graph.s2jobs.wal.udafs._
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+class S2EdgeDataAggregateProcessTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
+  val walLogsLs = Seq(
+    WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""),
+    WalLog(2L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""),
+    WalLog(3L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""),
+    WalLog(4L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""")
+  )
+  val walLogsLs2 = Seq(
+    WalLog(5L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""),
+    WalLog(6L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""),
+    WalLog(7L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""),
+    WalLog(8L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""")
+  )
+
+
+  test("test S2EdgeDataAggregateProcess") {
+    import spark.sqlContext.implicits._
+
+    val edges = spark.createDataset((0 until 10).flatMap(ith => walLogsLs)).toDF()
+    val inputMap = Map("edges" -> edges)
+    val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"),
+      options = Map("maxNumOfEdges" -> "10",
+        "groupByAggClassName" -> "GroupByAgg"))
+
+    val job = new S2EdgeDataAggregateProcess(taskConf = taskConf)
+    val processed = job.execute(spark, inputMap)
+
+    processed.printSchema()
+    processed.collect().foreach { row =>
+      println(row)
+    }
+  }
+
+  test("test S2EdgeDataArrayAggregateProcess") {
+    import spark.sqlContext.implicits._
+
+    val edges = spark.createDataset(walLogsLs).toDF()
+    val edges2 = spark.createDataset(walLogsLs2).toDF()
+
+    val firstConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"),
+      options = Map("maxNumOfEdges" -> "10"))
+
+    val firstJob = new S2EdgeDataAggregateProcess(firstConf)
+    val firstJob2 = new S2EdgeDataAggregateProcess(firstConf)
+
+    val first = firstJob.execute(spark, Map("edges" -> edges))
+    val first2 = firstJob2.execute(spark, Map("edges" -> edges2))
+
+    val secondInputMap = Map(
+      "aggregated" -> first.union(first2)
+    )
+
+    val secondConf = new TaskConf(name = "testarray", `type` = "agg",
+      inputs = Seq("aggregated"),
+      options = Map("maxNumOfEdges" -> "10"))
+
+    val secondJob = new S2EdgeDataArrayAggregateProcess(secondConf)
+
+
+    val processed = secondJob.execute(spark, secondInputMap)
+
+    processed.printSchema()
+    processed.collect().foreach { row =>
+      println(row)
+    }
+  }
+
+  test("mergeTwoSeq") {
+    val prev: Array[Int] = Array(3, 2, 1)
+    val cur: Array[Int] = Array(4, 2, 2)
+
+    val ls = S2EdgeDataAggregate.mergeTwoSeq(prev, cur, 10)
+    println(ls.size)
+
+    ls.foreach { x =>
+      println(x)
+    }
+  }
+}
\ No newline at end of file