Merge branch 'S2GRAPH-248' of https://github.com/daewon/incubator-s2graph into daewon-S2GRAPH-248
diff --git a/CHANGES b/CHANGES
index df8fbf2..26388ab 100644
--- a/CHANGES
+++ b/CHANGES
@@ -38,6 +38,8 @@
     * [S2GRAPH-205] - too many initialize S2Graph when writeBatchMutate on S2GraphSink
     * [S2GRAPH-201] - Provide S2GraphSource
     * [S2GRAPH-218] - add operations not supported on sql
+    * [S2GRAPH-225] - support custom udf class
+    * [S2GRAPH-251] - Provide JdbcSource/Sink
 
 ** Bug
     * [S2GRAPH-159] - Wrong syntax at a bash script under Linux
@@ -104,6 +106,7 @@
     * [S2GRAPH-219] - Added query that includes all vertices and associated edges for GraphVisualize.
     * [S2GRAPH-222] - Support Not logical operator in WhereParser.
     * [S2GRAPH-223] - Support WhereParser on Vertex.
+    * [S2GRAPH-226] - Provide example spark jobs to explain how to utilize WAL log.
 
 ** Task
     * [S2GRAPH-162] - Update year in the NOTICE file.
diff --git a/project/Common.scala b/project/Common.scala
index 42628f0..1008b2c 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -30,7 +30,7 @@
   val hadoopVersion = "2.7.3"
   val tinkerpopVersion = "3.2.5"
 
-  val elastic4sVersion = "6.1.1"
+  val elastic4sVersion = "6.2.4"
 
   val KafkaVersion = "0.10.2.1"
 
diff --git a/s2jobs/build.sbt b/s2jobs/build.sbt
index f647040..47ec835 100644
--- a/s2jobs/build.sbt
+++ b/s2jobs/build.sbt
@@ -38,6 +38,7 @@
   "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion,
   "org.elasticsearch" % "elasticsearch-spark-20_2.11" % elastic4sVersion,
   "com.github.scopt" %% "scopt" % "3.7.0",
+  "io.thekraken" % "grok" % "0.1.5",
   "com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % Test
 )
 
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
index 8f21bc2..026b688 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
@@ -35,7 +35,7 @@
 
       dfMap.put(source.conf.name, df)
     }
-    logger.debug(s"valid source DF set : ${dfMap.keySet}")
+    logger.info(s"valid source DF set : ${dfMap.keySet}")
 
     // process
     var processRst:Seq[(String, DataFrame)] = Nil
@@ -45,7 +45,7 @@
 
     } while(processRst.nonEmpty)
 
-    logger.debug(s"valid named DF set : ${dfMap.keySet}")
+    logger.info(s"valid named DF set : ${dfMap.keySet}")
 
     // sinks
     jobDesc.sinks.foreach { s =>
@@ -63,8 +63,7 @@
     val dfKeys = dfMap.keySet
 
     processes.filter{ p =>
-        var existAllInput = true
-        p.conf.inputs.foreach { input => existAllInput = dfKeys(input) }
+        val existAllInput = p.conf.inputs.forall{ input => dfKeys(input) }
         !dfKeys(p.conf.name) && existAllInput
     }
     .map { p =>
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
index 9a529aa..cfa547b 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
@@ -21,28 +21,32 @@
 
 import play.api.libs.json.{JsValue, Json}
 import org.apache.s2graph.s2jobs.task._
+import org.apache.s2graph.s2jobs.udfs.UdfOption
 
 case class JobDescription(
                            name:String,
+                           udfs: Seq[UdfOption],
                            sources:Seq[Source],
                            processes:Seq[task.Process],
                            sinks:Seq[Sink]
                          )
 
 object JobDescription extends Logger {
-  val dummy = JobDescription("dummy", Nil, Nil, Nil)
+  val dummy = JobDescription("dummy", Nil, Nil, Nil, Nil)
 
   def apply(jsVal:JsValue):JobDescription = {
     implicit val TaskConfReader = Json.reads[TaskConf]
+    implicit val UdfOptionReader = Json.reads[UdfOption]
 
     logger.debug(s"JobDescription: ${jsVal}")
 
     val jobName = (jsVal \ "name").as[String]
+    val udfs = (jsVal \ "udfs").asOpt[Seq[UdfOption]].getOrElse(Nil)
     val sources = (jsVal \ "source").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSource(conf))
     val processes = (jsVal \ "process").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getProcess(conf))
     val sinks = (jsVal \ "sink").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSink(jobName, conf))
 
-    JobDescription(jobName, sources, processes, sinks)
+    JobDescription(jobName, udfs, sources, processes, sinks)
   }
 
   def getSource(conf:TaskConf):Source = {
@@ -50,6 +54,7 @@
       case "kafka" => new KafkaSource(conf)
       case "file"  => new FileSource(conf)
       case "hive" => new HiveSource(conf)
+      case "jdbc" => new JdbcSource(conf)
       case "s2graph" => new S2GraphSource(conf)
       case _ => throw new IllegalArgumentException(s"unsupported source type : ${conf.`type`}")
     }
@@ -82,6 +87,7 @@
       case "file" => new FileSink(jobName, conf)
       case "es" => new ESSink(jobName, conf)
       case "s2graph" => new S2GraphSink(jobName, conf)
+      case "jdbc" => new JdbcSink(jobName, conf)
       case "custom" =>
         val customClassOpt = conf.options.get("class")
         customClassOpt match {
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
index a64a399..0a76274 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
@@ -19,6 +19,7 @@
 
 package org.apache.s2graph.s2jobs
 
+import org.apache.s2graph.s2jobs.udfs.Udf
 import org.apache.spark.sql.SparkSession
 import play.api.libs.json.{JsValue, Json}
 
@@ -82,6 +83,13 @@
       .enableHiveSupport()
       .getOrCreate()
 
+    // register udfs
+    jobDescription.udfs.foreach{ udfOption =>
+      val udf = Class.forName(udfOption.`class`).newInstance().asInstanceOf[Udf]
+      logger.info((s"[udf register] ${udfOption}"))
+      udf.register(ss, udfOption.name, udfOption.params.getOrElse(Map.empty))
+    }
+
     val job = new Job(ss, jobDescription)
     job.run()
   }
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
index 58d3368..1c47f13 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
@@ -22,10 +22,19 @@
 import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
 
 object Schema {
-  val BulkLoadSchema = StructType(Seq(
-    StructField("timestamp", LongType, false),
-    StructField("operation", StringType, false),
-    StructField("elem", StringType, false),
+  /**
+    * root
+    * |-- timestamp: long (nullable = false)
+    * |-- operation: string (nullable = false)
+    * |-- elem: string (nullable = false)
+    */
+  val CommonFields = Seq(
+    StructField("timestamp", LongType, nullable = false),
+    StructField("operation", StringType, nullable = false),
+    StructField("elem", StringType, nullable = false)
+  )
+
+  val BulkLoadSchema = StructType(CommonFields ++ Seq(
     StructField("from", StringType, false),
     StructField("to", StringType, false),
     StructField("label", StringType, false),
@@ -33,24 +42,63 @@
     StructField("direction", StringType, true)
   ))
 
-  val VertexSchema = StructType(Seq(
-    StructField("timestamp", LongType, false),
-    StructField("operation", StringType, false),
-    StructField("elem", StringType, false),
+  /**
+    * root
+    * |-- timestamp: long (nullable = true)
+    * |-- operation: string (nullable = true)
+    * |-- elem: string (nullable = true)
+    * |-- id: string (nullable = true)
+    * |-- service: string (nullable = true)
+    * |-- column: string (nullable = true)
+    * |-- props: string (nullable = true)
+    */
+  val VertexSchema = StructType(CommonFields ++ Seq(
     StructField("id", StringType, false),
     StructField("service", StringType, false),
     StructField("column", StringType, false),
     StructField("props", StringType, false)
   ))
 
-  val EdgeSchema = StructType(Seq(
-    StructField("timestamp", LongType, false),
-    StructField("operation", StringType, false),
-    StructField("elem", StringType, false),
+
+  /**
+    * root
+    * |-- timestamp: long (nullable = true)
+    * |-- operation: string (nullable = true)
+    * |-- elem: string (nullable = true)
+    * |-- from: string (nullable = true)
+    * |-- to: string (nullable = true)
+    * |-- label: string (nullable = true)
+    * |-- props: string (nullable = true)
+    * |-- direction: string (nullable = true)
+    */
+  val EdgeSchema = StructType(CommonFields ++ Seq(
     StructField("from", StringType, false),
     StructField("to", StringType, false),
     StructField("label", StringType, false),
     StructField("props", StringType, false),
     StructField("direction", StringType, true)
   ))
+
+  /**
+    * root
+    * |-- timestamp: long (nullable = false)
+    * |-- operation: string (nullable = false)
+    * |-- elem: string (nullable = false)
+    * |-- id: string (nullable = true)
+    * |-- service: string (nullable = true)
+    * |-- column: string (nullable = true)
+    * |-- from: string (nullable = true)
+    * |-- to: string (nullable = true)
+    * |-- label: string (nullable = true)
+    * |-- props: string (nullable = true)
+    */
+  val GraphElementSchema = StructType(CommonFields ++ Seq(
+    StructField("id", StringType, nullable = true),
+    StructField("service", StringType, nullable = true),
+    StructField("column", StringType, nullable = true),
+    StructField("from", StringType, nullable = true),
+    StructField("to", StringType, nullable = true),
+    StructField("label", StringType, nullable = true),
+    StructField("props", StringType, nullable = true)
+  ))
 }
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 4de585c..c5ec8f0 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -20,22 +20,17 @@
 package org.apache.s2graph.s2jobs.task
 
 import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
-import org.apache.hadoop.util.ToolRunner
-import org.apache.s2graph.core.{GraphUtil, Management}
+import org.apache.s2graph.core.GraphUtil
 import org.apache.s2graph.s2jobs.S2GraphHelper
-import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer}
+import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer}
 import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
 import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.spark.sql.streaming.S2SinkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
 import org.elasticsearch.spark.sql.EsSparkSQL
 
-import scala.collection.mutable.ListBuffer
-import scala.concurrent.{Await, Future}
 import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
 
 /**
   * Sink
@@ -132,6 +127,8 @@
     logger.debug(s"${LOG_PREFIX} schema: ${df.schema}")
 
     conf.options.getOrElse("format", "json") match {
+      case "raw" =>
+        df
       case "tsv" =>
         val delimiter = conf.options.getOrElse("delimiter", "\t")
 
@@ -183,6 +180,20 @@
 }
 
 /**
+  * JdbcSink
+  * @param queryName
+  * @param conf
+  */
+class JdbcSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
+  override def mandatoryOptions: Set[String] = Set("url", "dbtable")
+  override val FORMAT: String = "jdbc"
+
+  override protected def writeBatchInner(writer: DataFrameWriter[Row]): Unit = {
+    writer.format("jdbc").options(conf.options).save()
+  }
+}
+
+/**
   * ESSink
   *
   * @param queryName
@@ -212,9 +223,10 @@
   * @param conf
   */
 class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
-  import scala.collection.JavaConversions._
-  import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
   import org.apache.s2graph.core.S2GraphConfigs._
+  import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
+  import scala.collection.JavaConversions._
 
   override def mandatoryOptions: Set[String] = Set()
 
@@ -261,6 +273,10 @@
   }
 
   private def writeBatchWithMutate(df:DataFrame):Unit = {
+    import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
+    import scala.collection.JavaConversions._
+
     // TODO: FIX THIS. overwrite local cache config.
     val mergedOptions = conf.options ++ TaskConf.parseLocalCacheConfigs(conf)
     val graphConfig: Config = ConfigFactory.parseMap(mergedOptions).withFallback(ConfigFactory.load())
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index bfac62b..d985edc 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -19,12 +19,13 @@
 
 package org.apache.s2graph.s2jobs.task
 
-import org.apache.s2graph.core.Management
+import org.apache.s2graph.core.{JSONParser, Management}
 import org.apache.s2graph.s2jobs.Schema
 import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer}
 import org.apache.s2graph.s2jobs.serde.reader.S2GraphCellReader
 import org.apache.s2graph.s2jobs.serde.writer.RowDataFrameWriter
 import org.apache.spark.sql.{DataFrame, SparkSession}
+import play.api.libs.json.{JsObject, Json}
 
 
 /**
@@ -98,13 +99,21 @@
     val paths = conf.options("paths").split(",")
     val format = conf.options.getOrElse("format", DEFAULT_FORMAT)
     val columnsOpt = conf.options.get("columns")
+    val readOptions = conf.options.get("read").map { s =>
+      Json.parse(s).as[JsObject].fields.map { case (k, jsValue) =>
+        k -> JSONParser.jsValueToString(jsValue)
+      }.toMap
+    }.getOrElse(Map.empty)
 
     format match {
       case "edgeLog" =>
         ss.read.format("com.databricks.spark.csv").option("delimiter", "\t")
           .schema(BulkLoadSchema).load(paths: _*)
-      case _ => ss.read.format(format).load(paths: _*)
-        val df = ss.read.format(format).load(paths: _*)
+      case _ =>
+        val df =
+          if (readOptions.isEmpty) ss.read.format(format).load(paths: _*)
+          else ss.read.options(readOptions).format(format).load(paths: _*)
+
         if (columnsOpt.isDefined) df.toDF(columnsOpt.get.split(",").map(_.trim): _*) else df
     }
   }
@@ -122,6 +131,14 @@
   }
 }
 
+class JdbcSource(conf:TaskConf) extends Source(conf) {
+  override def mandatoryOptions: Set[String] = Set("url", "dbtable")
+  override def toDF(ss: SparkSession): DataFrame = {
+    ss.read.format("jdbc").options(conf.options).load()
+  }
+
+}
+
 class S2GraphSource(conf: TaskConf) extends Source(conf) {
   import org.apache.s2graph.spark.sql.streaming.S2SourceConfigs._
   override def mandatoryOptions: Set[String] = Set(
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index ab02900..62081df 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -21,9 +21,13 @@
 
 import org.apache.s2graph.core.S2GraphConfigs
 import org.apache.s2graph.s2jobs.Logger
+import org.apache.s2graph.s2jobs.wal.transformer.Transformer
+import play.api.libs.json.Json
 //import org.apache.s2graph.s2jobs.loader.GraphFileOptions
 
 object TaskConf {
+  val Empty = new TaskConf(name = "empty", `type` = "empty", inputs = Nil, options = Map.empty)
+
 //  def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
 //    val args = taskConf.options.filterKeys(GraphFileOptions.OptionKeys)
 //      .flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray
@@ -42,6 +46,18 @@
   def parseLocalCacheConfigs(taskConf: TaskConf): Map[String, Any] = {
     taskConf.options.filterKeys(S2GraphConfigs.CacheConfigs.DEFAULTS.keySet).mapValues(_.toInt)
   }
+
+  def parseTransformers(taskConf: TaskConf): Seq[Transformer] = {
+    val classes = Json.parse(taskConf.options.getOrElse("transformClasses",
+      """["org.apache.s2graph.s2jobs.wal.transformer.DefaultTransformer"]""")).as[Seq[String]]
+
+    classes.map { className =>
+      Class.forName(className)
+        .getConstructor(classOf[TaskConf])
+        .newInstance(taskConf)
+        .asInstanceOf[Transformer]
+    }
+  }
 }
 case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty, cache:Option[Boolean]=None)
 
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala
new file mode 100644
index 0000000..ebcb41d
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala
@@ -0,0 +1,36 @@
+package org.apache.s2graph.s2jobs.udfs
+
+import org.apache.s2graph.s2jobs.utils.GrokHelper
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.{DataType, StructType}
+import play.api.libs.json.{JsValue, Json}
+
+class Grok extends Udf {
+  import org.apache.spark.sql.functions.udf
+
+  def register(ss: SparkSession, name:String, options:Map[String, String]) = {
+    // grok
+    val patternDir = options.getOrElse("patternDir", "/tmp")
+    val patternFiles = options.getOrElse("patternFiles", "").split(",").toSeq
+    val patterns = Json.parse(options.getOrElse("patterns", "{}")).asOpt[Map[String, String]].getOrElse(Map.empty)
+    val compilePattern = options("compilePattern")
+    val schemaOpt = options.get("schema")
+
+    patternFiles.foreach { patternFile =>
+      ss.sparkContext.addFile(s"${patternDir}/${patternFile}")
+    }
+
+    implicit val grok = GrokHelper.getGrok(name, patternFiles, patterns, compilePattern)
+
+    val f = if(schemaOpt.isDefined) {
+      val schema = DataType.fromJson(schemaOpt.get)
+      implicit val keys:Array[String] = schema.asInstanceOf[StructType].fieldNames
+      udf(GrokHelper.grokMatchWithSchema _, schema)
+    } else {
+      udf(GrokHelper.grokMatch _)
+    }
+
+
+    ss.udf.register(name, f)
+  }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala
new file mode 100644
index 0000000..821527c
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala
@@ -0,0 +1,14 @@
+package org.apache.s2graph.s2jobs.udfs
+
+import org.apache.s2graph.s2jobs.Logger
+import org.apache.spark.sql.SparkSession
+
+case class UdfOption(name:String, `class`:String, params:Option[Map[String, String]] = None)
+trait Udf extends Serializable with Logger {
+  def register(ss: SparkSession, name:String, options:Map[String, String])
+}
+
+
+
+
+
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala
new file mode 100644
index 0000000..37485c8
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala
@@ -0,0 +1,59 @@
+package org.apache.s2graph.s2jobs.utils
+
+import io.thekraken.grok.api.Grok
+import org.apache.s2graph.s2jobs.Logger
+import org.apache.spark.SparkFiles
+import org.apache.spark.sql.Row
+
+import scala.collection.mutable
+
+object GrokHelper extends Logger {
+  private val grokPool:mutable.Map[String, Grok] = mutable.Map.empty
+
+  def getGrok(name:String, patternFiles:Seq[String], patterns:Map[String, String], compilePattern:String):Grok = {
+    if (grokPool.get(name).isEmpty) {
+      println(s"Grok '$name' initialized..")
+      val grok = new Grok()
+      patternFiles.foreach { patternFile =>
+        val filePath = SparkFiles.get(patternFile)
+        println(s"[Grok][$name] add pattern file : $patternFile  ($filePath)")
+        grok.addPatternFromFile(filePath)
+      }
+      patterns.foreach { case (name, pattern) =>
+        println(s"[Grok][$name] add pattern : $name ($pattern)")
+        grok.addPattern(name, pattern)
+      }
+
+      grok.compile(compilePattern)
+      println(s"[Grok][$name] patterns: ${grok.getPatterns}")
+      grokPool.put(name, grok)
+    }
+
+    grokPool(name)
+  }
+
+  def grokMatch(text:String)(implicit grok:Grok):Option[Map[String, String]] = {
+    import scala.collection.JavaConverters._
+
+    val m = grok.`match`(text)
+    m.captures()
+    val rstMap = m.toMap.asScala.toMap
+      .filter(_._2 != null)
+      .map{ case (k, v) =>  k -> v.toString}
+    if (rstMap.isEmpty) None else Some(rstMap)
+  }
+
+  def grokMatchWithSchema(text:String)(implicit grok:Grok, keys:Array[String]):Option[Row] = {
+    import scala.collection.JavaConverters._
+
+    val m = grok.`match`(text)
+    m.captures()
+
+    val rstMap = m.toMap.asScala.toMap
+    if (rstMap.isEmpty) None
+    else {
+      val l = keys.map { key => rstMap.getOrElse(key, null)}
+      Some(Row.fromSeq(l))
+    }
+  }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
new file mode 100644
index 0000000..ad696a9
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
@@ -0,0 +1,208 @@
+package org.apache.s2graph.s2jobs.wal
+
+import com.google.common.hash.Hashing
+import org.apache.s2graph.core.{GraphUtil, JSONParser}
+import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
+import org.apache.s2graph.s2jobs.wal.transformer.Transformer
+import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
+import play.api.libs.json.{JsObject, Json}
+
+import scala.util.Try
+
+object WalLogAgg {
+  val outputColumns = Seq("from", "vertices", "edges")
+
+  def isEdge(walLog: WalLog): Boolean = {
+    walLog.elem == "edge" || walLog.elem == "e"
+  }
+
+  def apply(walLog: WalLog): WalLogAgg = {
+    val (vertices, edges) =
+      if (isEdge(walLog)) (Nil, Seq(walLog))
+      else (Seq(walLog), Nil)
+
+    new WalLogAgg(walLog.from, vertices, edges)
+  }
+
+  def toFeatureHash(dimVal: DimVal): Long = toFeatureHash(dimVal.dim, dimVal.value)
+
+  def toFeatureHash(dim: String, value: String): Long = {
+    Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong()
+  }
+
+  private def addToHeap(walLog: WalLog,
+                        heap: BoundedPriorityQueue[WalLog],
+                        now: Long,
+                        validTimestampDuration: Option[Long]): Unit = {
+    val ts = walLog.timestamp
+    val isValid = validTimestampDuration.map(d => now - ts < d).getOrElse(true)
+
+    if (isValid) {
+      heap += walLog
+    }
+  }
+
+  private def addToHeap(iter: Seq[WalLog],
+                        heap: BoundedPriorityQueue[WalLog],
+                        now: Long,
+                        validTimestampDuration: Option[Long]): Unit = {
+    iter.foreach(walLog => addToHeap(walLog, heap, now, validTimestampDuration))
+  }
+
+  private def toWalLogAgg(edgeHeap: BoundedPriorityQueue[WalLog],
+                          vertexHeap: BoundedPriorityQueue[WalLog],
+                          sortTopItems: Boolean): Option[WalLogAgg] = {
+    val topVertices = if (sortTopItems) vertexHeap.toArray.sortBy(-_.timestamp) else vertexHeap.toArray
+    val topEdges = if (sortTopItems) edgeHeap.toArray.sortBy(-_.timestamp) else edgeHeap.toArray
+
+    topEdges.headOption.map(head => WalLogAgg(head.from, topVertices, topEdges))
+  }
+
+  def mergeWalLogs(iter: Iterator[WalLog],
+                   heapSize: Int,
+                   now: Long,
+                   validTimestampDuration: Option[Long],
+                   sortTopItems: Boolean)(implicit ord: Ordering[WalLog]): Option[WalLogAgg] = {
+    val edgeHeap = new BoundedPriorityQueue[WalLog](heapSize)
+    val vertexHeap = new BoundedPriorityQueue[WalLog](heapSize)
+
+    iter.foreach { walLog =>
+      if (walLog.isVertex) addToHeap(walLog, vertexHeap, now, validTimestampDuration)
+      else addToHeap(walLog, edgeHeap, now, validTimestampDuration)
+    }
+
+    toWalLogAgg(edgeHeap, vertexHeap, sortTopItems)
+  }
+
+  def merge(iter: Iterator[WalLogAgg],
+            heapSize: Int,
+            now: Long,
+            validTimestampDuration: Option[Long],
+            sortTopItems: Boolean)(implicit ord: Ordering[WalLog]): Option[WalLogAgg] = {
+    val edgeHeap = new BoundedPriorityQueue[WalLog](heapSize)
+    val vertexHeap = new BoundedPriorityQueue[WalLog](heapSize)
+
+    iter.foreach { walLogAgg =>
+      addToHeap(walLogAgg.vertices, vertexHeap, now, validTimestampDuration)
+      addToHeap(walLogAgg.edges, edgeHeap, now, validTimestampDuration)
+    }
+
+    toWalLogAgg(edgeHeap, vertexHeap, sortTopItems)
+  }
+
+  def mergeWalLogs(iter: Iterator[WalLog],
+                   param: AggregateParam)(implicit ord: Ordering[WalLog]): Option[WalLogAgg] = {
+    mergeWalLogs(iter, param.heapSize, param.now, param.validTimestampDuration, param.sortTopItems)
+  }
+
+  def merge(iter: Iterator[WalLogAgg],
+            param: AggregateParam)(implicit ord: Ordering[WalLog]): Option[WalLogAgg] = {
+    merge(iter, param.heapSize, param.now, param.validTimestampDuration, param.sortTopItems)
+  }
+
+
+  private def filterPropsInner(walLogs: Seq[WalLog],
+                          transformers: Seq[Transformer],
+                          validFeatureHashKeys: Set[Long]): Seq[WalLog] = {
+    walLogs.map { walLog =>
+      val fields = walLog.propsJson.fields.filter { case (propKey, propValue) =>
+        val filtered = transformers.flatMap { transformer =>
+          transformer.toDimValLs(walLog, propKey, JSONParser.jsValueToString(propValue)).filter(dimVal => validFeatureHashKeys(toFeatureHash(dimVal)))
+        }
+        filtered.nonEmpty
+      }
+
+      walLog.copy(props = Json.toJson(fields.toMap).as[JsObject].toString)
+    }
+  }
+
+  def filterProps(walLogAgg: WalLogAgg,
+                        transformers: Seq[Transformer],
+                        validFeatureHashKeys: Set[Long]) = {
+    val filteredVertices = filterPropsInner(walLogAgg.vertices, transformers, validFeatureHashKeys)
+    val filteredEdges = filterPropsInner(walLogAgg.edges, transformers, validFeatureHashKeys)
+
+    walLogAgg.copy(vertices = filteredVertices, edges = filteredEdges)
+  }
+}
+
+object DimValCountRank {
+  def fromRow(row: Row): DimValCountRank = {
+    val dim = row.getAs[String]("dim")
+    val value = row.getAs[String]("value")
+    val count = row.getAs[Long]("count")
+    val rank = row.getAs[Long]("rank")
+
+    new DimValCountRank(DimVal(dim, value), count, rank)
+  }
+}
+
+case class DimValCountRank(dimVal: DimVal, count: Long, rank: Long)
+
+case class DimValCount(dimVal: DimVal, count: Long)
+
+object DimVal {
+  def fromRow(row: Row): DimVal = {
+    val dim = row.getAs[String]("dim")
+    val value = row.getAs[String]("value")
+
+    new DimVal(dim, value)
+  }
+}
+
+case class DimVal(dim: String, value: String)
+
+case class WalLogAgg(from: String,
+                     vertices: Seq[WalLog],
+                     edges: Seq[WalLog])
+
+case class WalLog(timestamp: Long,
+                  operation: String,
+                  elem: String,
+                  from: String,
+                  to: String,
+                  service: String,
+                  label: String,
+                  props: String) {
+  val isVertex = elem == "v" || elem == "vertex"
+  val id = from
+  val columnName = label
+  val serviceName = to
+  lazy val propsJson = Json.parse(props).as[JsObject]
+  lazy val propsKeyValues = propsJson.fields.map { case (key, jsValue) =>
+    key -> JSONParser.jsValueToString(jsValue)
+  }
+}
+
+object WalLog {
+  val orderByTsAsc = Ordering.by[WalLog, Long](walLog => walLog.timestamp)
+
+  val WalLogSchema = StructType(Seq(
+    StructField("timestamp", LongType, false),
+    StructField("operation", StringType, false),
+    StructField("elem", StringType, false),
+    StructField("from", StringType, false),
+    StructField("to", StringType, false),
+    StructField("service", StringType, true),
+    StructField("label", StringType, false),
+    StructField("props", StringType, false)
+    //    StructField("direction", StringType, true)
+  ))
+
+  def fromRow(row: Row): WalLog = {
+    val timestamp = row.getAs[Long]("timestamp")
+    val operation = Try(row.getAs[String]("operation")).toOption.getOrElse("insert")
+    val elem = Try(row.getAs[String]("elem")).toOption.getOrElse("edge")
+    val from = row.getAs[String]("from")
+    val to = row.getAs[String]("to")
+    val service = row.getAs[String]("service")
+    val label = row.getAs[String]("label")
+    val props = Try(row.getAs[String]("props")).toOption.getOrElse("{}")
+
+    WalLog(timestamp, operation, elem, from, to, service, label, props)
+  }
+
+
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcess.scala
new file mode 100644
index 0000000..52b2ff0
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcess.scala
@@ -0,0 +1,126 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.WalLogAgg.toFeatureHash
+import org.apache.s2graph.s2jobs.wal.process.params.BuildTopFeaturesParam
+import org.apache.s2graph.s2jobs.wal.transformer._
+import org.apache.s2graph.s2jobs.wal.udfs.WalLogUDF
+import org.apache.s2graph.s2jobs.wal.{DimVal, DimValCount, WalLog, WalLogAgg}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions._
+import play.api.libs.json.{JsObject, Json}
+
+import scala.collection.mutable
+
+object BuildTopFeaturesProcess {
+  def extractDimValuesWithCount(transformers: Seq[Transformer]) = {
+    udf((rows: Seq[Row]) => {
+      val logs = rows.map(WalLog.fromRow)
+      val dimValCounts = mutable.Map.empty[DimVal, Int]
+
+      logs.foreach { walLog =>
+        walLog.propsKeyValues.foreach { case (propsKey, propsValue) =>
+          transformers.foreach { transformer =>
+            transformer.toDimValLs(walLog, propsKey, propsValue).foreach { dimVal =>
+              val newCount = dimValCounts.getOrElse(dimVal, 0) + 1
+              dimValCounts += (dimVal -> newCount)
+            }
+          }
+        }
+      }
+
+      dimValCounts.toSeq.sortBy(-_._2)map { case (dimVal, count) =>
+        DimValCount(dimVal, count)
+      }
+    })
+  }
+
+  def extractDimValues(transformers: Seq[Transformer]) = {
+    udf((rows: Seq[Row]) => {
+      val logs = rows.map(WalLog.fromRow)
+      // TODO: this can be changed into Map to count how many times each dimVal exist in sequence of walLog
+      // then change this to mutable.Map.empty[DimVal, Int], then aggregate.
+      val distinctDimValues = mutable.Set.empty[DimVal]
+
+      logs.foreach { walLog =>
+        walLog.propsKeyValues.foreach { case (propsKey, propsValue) =>
+          transformers.foreach { transformer =>
+            transformer.toDimValLs(walLog, propsKey, propsValue).foreach { dimVal =>
+              distinctDimValues += dimVal
+            }
+          }
+        }
+      }
+
+      distinctDimValues.toSeq
+    })
+  }
+
+  def buildDictionary(ss: SparkSession,
+                      allDimVals: DataFrame,
+                      param: BuildTopFeaturesParam,
+                      dimValColumnName: String = "dimVal"): DataFrame = {
+    import ss.implicits._
+
+    val rawFeatures = allDimVals
+      .select(col(param._countColumnName), col(s"$dimValColumnName.dim").as("dim"), col(s"$dimValColumnName.value").as("value"))
+      .groupBy("dim", "value")
+      .agg(countDistinct(param._countColumnName).as("count"))
+      .filter(s"count > ${param._minUserCount}")
+
+    val ds: Dataset[((String, Long), String)] =
+      rawFeatures.select("dim", "value", "count").as[(String, String, Long)]
+        .map { case (dim, value, uv) =>
+          (dim, uv) -> value
+        }
+
+
+    implicit val ord = Ordering.Tuple2(Ordering.String, Ordering.Long.reverse)
+
+    val rdd: RDD[(Long, (String, Long), String)] = WalLogUDF.appendRank(ds, param.numOfPartitions, param.samplePointsPerPartitionHint)
+
+    rdd.toDF("rank", "dim_count", "value")
+      .withColumn("dim", col("dim_count._1"))
+      .withColumn("count", col("dim_count._2"))
+      .select("dim", "value", "count", "rank")
+  }
+}
+
+case class BuildTopFeaturesProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) {
+
+  import BuildTopFeaturesProcess._
+
+  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = {
+    val countColumnName = taskConf.options.getOrElse("countColumnName", "from")
+    val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt)
+    val samplePointsPerPartitionHint = taskConf.options.get("samplePointsPerPartitionHint").map(_.toInt)
+    val minUserCount = taskConf.options.get("minUserCount").map(_.toLong)
+
+    numOfPartitions.map { d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d.toString) }
+
+    val param = BuildTopFeaturesParam(minUserCount = minUserCount, countColumnName = Option(countColumnName),
+      numOfPartitions = numOfPartitions, samplePointsPerPartitionHint = samplePointsPerPartitionHint
+    )
+
+    val edges = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, cur) =>
+      prev.union(inputMap(cur))
+    }
+
+    //TODO: user expect to inject transformers that transform (WalLog, propertyKey, propertyValue) to Seq[DimVal].
+    val transformers = TaskConf.parseTransformers(taskConf)
+    val dimValExtractUDF = extractDimValues(transformers)
+    val dimValColumnName = "dimVal"
+
+    val rawFeatures = edges
+      .withColumn(dimValColumnName, explode(dimValExtractUDF(col("edges"))))
+
+    val dict = buildDictionary(ss, rawFeatures, param, dimValColumnName)
+
+    dict
+  }
+
+
+  override def mandatoryOptions: Set[String] = Set.empty
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcess.scala
new file mode 100644
index 0000000..ec7e645
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcess.scala
@@ -0,0 +1,90 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.WalLogAgg
+import org.apache.s2graph.s2jobs.wal.transformer.{DefaultTransformer, Transformer}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import play.api.libs.json.{JsObject, Json}
+
+object FilterTopFeaturesProcess {
+  private var validFeatureHashKeys: Set[Long] = null
+  def getValidFeatureHashKeys(validFeatureHashKeysBCast: Broadcast[Array[Long]]): Set[Long] = {
+    if (validFeatureHashKeys == null) {
+      validFeatureHashKeys = validFeatureHashKeysBCast.value.toSet
+    }
+
+    validFeatureHashKeys
+  }
+
+  def collectDistinctFeatureHashes(ss: SparkSession,
+                                   filteredDict: DataFrame): Array[Long] = {
+    import ss.implicits._
+
+    val featureHashUDF = udf((dim: String, value: String) => WalLogAgg.toFeatureHash(dim, value))
+
+    filteredDict.withColumn("featureHash", featureHashUDF(col("dim"), col("value")))
+      .select("featureHash")
+      .distinct().as[Long].collect()
+  }
+
+  def filterTopKsPerDim(dict: DataFrame,
+                        maxRankPerDim: Broadcast[Map[String, Int]],
+                        defaultMaxRank: Int): DataFrame = {
+    val filterUDF = udf((dim: String, rank: Long) => {
+      rank < maxRankPerDim.value.getOrElse(dim, defaultMaxRank)
+    })
+
+    dict.filter(filterUDF(col("dim"), col("rank")))
+  }
+
+  def filterWalLogAgg(ss: SparkSession,
+                      walLogAgg: Dataset[WalLogAgg],
+                      transformers: Seq[Transformer],
+                      validFeatureHashKeysBCast: Broadcast[Array[Long]]) = {
+    import ss.implicits._
+    walLogAgg.mapPartitions { iter =>
+      val validFeatureHashKeys = getValidFeatureHashKeys(validFeatureHashKeysBCast)
+
+      iter.map { walLogAgg =>
+        WalLogAgg.filterProps(walLogAgg, transformers, validFeatureHashKeys)
+      }
+    }
+  }
+}
+
+class FilterTopFeaturesProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) {
+
+  import FilterTopFeaturesProcess._
+
+  /*
+    filter topKs per dim, then build valid dimValLs.
+    then broadcast valid dimValLs to original dataframe, and filter out not valid dimVal.
+   */
+  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = {
+    import ss.implicits._
+
+    val maxRankPerDim = taskConf.options.get("maxRankPerDim").map { s =>
+      Json.parse(s).as[JsObject].fields.map { case (k, jsValue) =>
+        k -> jsValue.as[Int]
+      }.toMap
+    }
+    val maxRankPerDimBCast = ss.sparkContext.broadcast(maxRankPerDim.getOrElse(Map.empty))
+
+    val defaultMaxRank = taskConf.options.get("defaultMaxRank").map(_.toInt)
+
+    val featureDict = inputMap(taskConf.options("featureDict"))
+    val walLogAgg = inputMap(taskConf.options("walLogAgg")).as[WalLogAgg]
+
+    val transformers = TaskConf.parseTransformers(taskConf)
+
+    val filteredDict = filterTopKsPerDim(featureDict, maxRankPerDimBCast, defaultMaxRank.getOrElse(Int.MaxValue))
+    val validFeatureHashKeys = collectDistinctFeatureHashes(ss, filteredDict)
+    val validFeatureHashKeysBCast = ss.sparkContext.broadcast(validFeatureHashKeys)
+
+    filterWalLogAgg(ss, walLogAgg, transformers, validFeatureHashKeysBCast).toDF()
+  }
+
+  override def mandatoryOptions: Set[String] = Set("featureDict", "walLogAgg")
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
new file mode 100644
index 0000000..e4aa4e1
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
@@ -0,0 +1,60 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
+import org.apache.s2graph.s2jobs.wal.{WalLog, WalLogAgg}
+import org.apache.spark.sql._
+
+object WalLogAggregateProcess {
+  def aggregate(ss: SparkSession,
+                dataset: Dataset[WalLogAgg],
+                aggregateParam: AggregateParam)(implicit ord: Ordering[WalLog]) = {
+    import ss.implicits._
+    dataset.groupByKey(_.from).flatMapGroups { case (_, iter) =>
+      WalLogAgg.merge(iter, aggregateParam)
+    }.toDF(WalLogAgg.outputColumns: _*)
+  }
+
+  def aggregateRaw(ss: SparkSession,
+                   dataset: Dataset[WalLog],
+                   aggregateParam: AggregateParam)(implicit ord: Ordering[WalLog]): DataFrame = {
+    import ss.implicits._
+
+    dataset.groupByKey(walLog => walLog.from).flatMapGroups { case (key, iter) =>
+      WalLogAgg.mergeWalLogs(iter, aggregateParam)
+    }.toDF(WalLogAgg.outputColumns: _*)
+  }
+}
+
+
+/**
+  * expect DataFrame of WalLog, then group WalLog by groupByKeys(default from).
+  * produce DataFrame of WalLogAgg which abstract the session consists of sequence of WalLog ordered by timestamp(desc).
+  *
+  * one way to visualize this is that transforming (row, column, value) matrix entries into (row, Sparse Vector(column:value).
+  * note that we only keep track of max topK latest walLog per each groupByKeys
+  */
+class WalLogAggregateProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) {
+
+  import WalLogAggregateProcess._
+
+  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = {
+    import ss.implicits._
+
+    //TODO: Current implementation only expect taskConf.options as Map[String, String].
+    //Once we change taskConf.options as JsObject, then we can simply parse input paramter as following.
+    //implicit val paramReads = Json.reads[AggregateParam]
+    val param = AggregateParam.fromTaskConf(taskConf)
+    param.numOfPartitions.foreach(d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d.toString))
+
+    implicit val ord = WalLog.orderByTsAsc
+    val walLogs = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, cur) =>
+      prev.union(inputMap(cur))
+    }
+
+    if (param.arrayType) aggregate(ss, walLogs.as[WalLogAgg], param)
+    else aggregateRaw(ss, walLogs.as[WalLog], param)
+  }
+
+  override def mandatoryOptions: Set[String] = Set.empty
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
new file mode 100644
index 0000000..523ec8e
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
@@ -0,0 +1,46 @@
+package org.apache.s2graph.s2jobs.wal.process.params
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+
+object AggregateParam {
+  val defaultGroupByKeys = Seq("from")
+  val defaultTopK = 1000
+  val defaultIsArrayType = false
+  val defaultShouldSortTopItems = true
+
+  def fromTaskConf(taskConf: TaskConf): AggregateParam = {
+    val groupByKeys = taskConf.options.get("groupByKeys").map(_.split(",").filter(_.nonEmpty).toSeq)
+    val maxNumOfEdges = taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(defaultTopK)
+    val arrayType = taskConf.options.get("arrayType").map(_.toBoolean).getOrElse(defaultIsArrayType)
+    val sortTopItems = taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(defaultShouldSortTopItems)
+    val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt)
+    val validTimestampDuration = taskConf.options.get("validTimestampDuration").map(_.toLong).getOrElse(Long.MaxValue)
+    val nowOpt = taskConf.options.get("now").map(_.toLong)
+
+    new AggregateParam(groupByKeys = groupByKeys,
+      topK = Option(maxNumOfEdges),
+      isArrayType = Option(arrayType),
+      shouldSortTopItems = Option(sortTopItems),
+      numOfPartitions = numOfPartitions,
+      validTimestampDuration = Option(validTimestampDuration),
+      nowOpt = nowOpt
+    )
+  }
+}
+
+case class AggregateParam(groupByKeys: Option[Seq[String]],
+                          topK: Option[Int],
+                          isArrayType: Option[Boolean],
+                          shouldSortTopItems: Option[Boolean],
+                          numOfPartitions: Option[Int],
+                          validTimestampDuration: Option[Long],
+                          nowOpt: Option[Long]) {
+
+  import AggregateParam._
+
+  val groupByColumns = groupByKeys.getOrElse(defaultGroupByKeys)
+  val heapSize = topK.getOrElse(defaultTopK)
+  val arrayType = isArrayType.getOrElse(defaultIsArrayType)
+  val sortTopItems = shouldSortTopItems.getOrElse(defaultShouldSortTopItems)
+  val now = nowOpt.getOrElse(System.currentTimeMillis())
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/BuildTopFeaturesParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/BuildTopFeaturesParam.scala
new file mode 100644
index 0000000..3e8bae5
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/BuildTopFeaturesParam.scala
@@ -0,0 +1,17 @@
+package org.apache.s2graph.s2jobs.wal.process.params
+
+object BuildTopFeaturesParam {
+  val defaultMinUserCount = 0L
+  val defaultCountColumnName = "from"
+}
+
+case class BuildTopFeaturesParam(minUserCount: Option[Long],
+                                 countColumnName: Option[String],
+                                 samplePointsPerPartitionHint: Option[Int],
+                                 numOfPartitions: Option[Int]) {
+
+  import BuildTopFeaturesParam._
+
+  val _countColumnName = countColumnName.getOrElse(defaultCountColumnName)
+  val _minUserCount = minUserCount.getOrElse(defaultMinUserCount)
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterTopFeaturesParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterTopFeaturesParam.scala
new file mode 100644
index 0000000..3b9d868
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterTopFeaturesParam.scala
@@ -0,0 +1,7 @@
+package org.apache.s2graph.s2jobs.wal.process.params
+
+class FilterTopFeaturesParam(maxRankPerDim: Option[Map[String, Int]],
+                             defaultMaxRank: Option[Int]) {
+  val _maxRankPerDim = maxRankPerDim.getOrElse(Map.empty)
+  val _defaultMaxRank = defaultMaxRank.getOrElse(Int.MaxValue)
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala
new file mode 100644
index 0000000..de328e5
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala
@@ -0,0 +1,5 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+
+case class DefaultTransformer(taskConf: TaskConf) extends Transformer(taskConf)
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala
new file mode 100644
index 0000000..45bbe66
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala
@@ -0,0 +1,23 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.utils.UrlUtils
+import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+import play.api.libs.json.Json
+
+case class ExtractDomain(taskConf: TaskConf) extends Transformer(taskConf) {
+  val urlDimensions = Json.parse(taskConf.options.getOrElse("urlDimensions", "[]")).as[Set[String]]
+  val hostDimName = taskConf.options.getOrElse("hostDimName", "host")
+  val domainDimName= taskConf.options.getOrElse("domainDimName", "domain")
+  val keywordDimName = taskConf.options.getOrElse("keywordDimName", "uri_keywords")
+  override def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): Seq[DimVal] = {
+    if (!urlDimensions(propertyKey)) Nil
+    else {
+      val (_, domains, kwdOpt) = UrlUtils.extract(propertyValue)
+
+      domains.headOption.toSeq.map(DimVal(hostDimName, _)) ++
+        domains.map(DimVal(domainDimName, _)) ++
+        kwdOpt.toSeq.map(DimVal(keywordDimName, _))
+    }
+  }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala
new file mode 100644
index 0000000..15efe9f
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala
@@ -0,0 +1,24 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+import org.apache.s2graph.core.JSONParser
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+import play.api.libs.json.{JsObject, Json}
+
+class ExtractServiceName(taskConf: TaskConf) extends Transformer(taskConf) {
+  val serviceDims = Json.parse(taskConf.options.getOrElse("serviceDims", "[]")).as[Set[String]]
+  val domainServiceMap = Json.parse(taskConf.options.getOrElse("domainServiceMap", "{}")).as[JsObject].fields.map { case (k, v) =>
+      k -> JSONParser.jsValueToString(v)
+  }.toMap
+  val serviceDimName = taskConf.options.getOrElse("serviceDimName", "serviceDimName")
+
+  override def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): Seq[DimVal] = {
+    if (!serviceDims(propertyKey)) Nil
+    else {
+      val serviceName = domainServiceMap.getOrElse(propertyValue, propertyValue)
+
+      Seq(DimVal(serviceDimName, serviceName))
+    }
+  }
+}
+
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala
new file mode 100644
index 0000000..68be2ad
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala
@@ -0,0 +1,16 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+
+/**
+  * decide how to transform walLog's each property key value to Seq[DimVal]
+  */
+abstract class Transformer(taskConf: TaskConf) extends Serializable {
+  def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): Seq[DimVal] = {
+    val dim = s"${walLog.label}:${propertyKey}"
+    val value = propertyValue
+
+    Seq(DimVal(dim, value))
+  }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala
new file mode 100644
index 0000000..81a1356
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala
@@ -0,0 +1,290 @@
+package org.apache.s2graph.s2jobs.wal.udafs
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+
+object WalLogUDAF {
+  type Element = (Long, String, String, String)
+
+  val emptyRow = new GenericRow(Array(-1L, "empty", "empty", "empty"))
+
+  val elementOrd = Ordering.by[Element, Long](_._1)
+
+  val rowOrdering = new Ordering[Row] {
+    override def compare(x: Row, y: Row): Int = {
+      x.getAs[Long](0).compareTo(y.getAs[Long](0))
+    }
+  }
+
+  val rowOrderingDesc = new Ordering[Row] {
+    override def compare(x: Row, y: Row): Int = {
+      -x.getAs[Long](0).compareTo(y.getAs[Long](0))
+    }
+  }
+
+  val fields = Seq(
+    StructField(name = "timestamp", LongType),
+    StructField(name = "to", StringType),
+    StructField(name = "label", StringType),
+    StructField(name = "props", StringType)
+  )
+
+  val arrayType = ArrayType(elementType = StructType(fields))
+
+  def apply(maxNumOfEdges: Int = 1000): GroupByAggOptimized = {
+    new GroupByAggOptimized(maxNumOfEdges)
+  }
+
+  def swap[T](array: mutable.Seq[T], i: Int, j: Int) = {
+    val tmp = array(i)
+    array(i) = array(j)
+    array(j) = tmp
+  }
+
+  @tailrec
+  def percolateDown[T](array: mutable.Seq[T], idx: Int)(implicit ordering: Ordering[T]): Unit = {
+    val left = 2 * idx + 1
+    val right = 2 * idx + 2
+    var smallest = idx
+
+    if (left < array.size && ordering.compare(array(left), array(smallest)) < 0) {
+      smallest = left
+    }
+
+    if (right < array.size && ordering.compare(array(right), array(smallest)) < 0) {
+      smallest = right
+    }
+
+    if (smallest != idx) {
+      swap(array, idx, smallest)
+      percolateDown(array, smallest)
+    }
+  }
+
+  def percolateUp[T](array: mutable.Seq[T],
+                     idx: Int)(implicit ordering: Ordering[T]): Unit = {
+    var pos = idx
+    var parent = (pos - 1) / 2
+    while (parent >= 0 && ordering.compare(array(pos), array(parent)) < 0) {
+      // swap pos and parent, since a[parent] > array[pos]
+      swap(array, parent, pos)
+      pos = parent
+      parent = (pos - 1) / 2
+    }
+  }
+
+  def addToTopK[T](array: mutable.Seq[T],
+                   size: Int,
+                   newData: T)(implicit ordering: Ordering[T]): mutable.Seq[T] = {
+    // use array as minHeap to keep track of topK.
+    // parent = (i -1) / 2
+    // left child = 2 * i + 1
+    // right chiud = 2  * i + 2
+
+    // check if array is already full.
+    if (array.size >= size) {
+      // compare newData to min. newData < array(0)
+      val currentMin = array(0)
+      if (ordering.compare(newData, currentMin) < 0) {
+        // drop newData
+      } else {
+        // delete min
+        array(0) = newData
+        // percolate down
+        percolateDown(array, 0)
+      }
+      array
+    } else {
+      // append new element into seqeunce since there are room left.
+      val newArray = array :+ newData
+      val idx = newArray.size - 1
+      // percolate up last element
+      percolateUp(newArray, idx)
+      newArray
+    }
+  }
+
+  def mergeTwoSeq[T](prev: Seq[T], cur: Seq[T], size: Int)(implicit ordering: Ordering[T]): Seq[T] = {
+    import scala.collection.mutable
+    val (n, m) = (cur.size, prev.size)
+
+    var (i, j) = (0, 0)
+    var idx = 0
+    val arr = new mutable.ArrayBuffer[T](size)
+
+    while (idx < size && i < n && j < m) {
+      if (ordering.compare(cur(i), prev(j)) < 0) {
+        arr += cur(i)
+        i += 1
+      } else {
+        arr += prev(j)
+        j += 1
+      }
+      idx += 1
+    }
+    while (idx < size && i < n) {
+      arr += cur(i)
+      i += 1
+    }
+    while (idx < size && j < m) {
+      arr += prev(j)
+      j += 1
+    }
+
+    arr
+  }
+}
+
+class GroupByAggOptimized(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction {
+
+  import WalLogUDAF._
+
+  implicit val ord = rowOrdering
+
+  val arrayType = ArrayType(elementType = StructType(fields))
+
+  type ROWS = mutable.Seq[Row]
+
+  override def inputSchema: StructType = StructType(fields)
+
+  override def bufferSchema: StructType = StructType(Seq(
+    StructField(name = "edges", dataType = arrayType)
+  ))
+
+  override def dataType: DataType = arrayType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, mutable.ArrayBuffer.empty[Row])
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val prev = buffer.getAs[ROWS](0)
+
+    val updated = addToTopK(prev, maxNumOfEdges, input)
+
+    buffer.update(0, updated)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    var prev = buffer1.getAs[ROWS](0)
+    val cur = buffer2.getAs[ROWS](0)
+
+    cur.filter(_ != null).foreach { row =>
+      prev = addToTopK(prev, maxNumOfEdges, row)
+    }
+
+    buffer1.update(0, prev)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    val ls = buffer.getAs[ROWS](0)
+    takeTopK(ls, maxNumOfEdges)
+  }
+
+  private def takeTopK(ls: Seq[Row], k: Int) = {
+    val sorted = ls.sorted
+    if (sorted.size <= k) sorted else sorted.take(k)
+  }
+}
+
+class GroupByAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction {
+  import WalLogUDAF._
+
+  implicit val ord = rowOrderingDesc
+
+  val arrayType = ArrayType(elementType = StructType(fields))
+
+  override def inputSchema: StructType = StructType(fields)
+
+  override def bufferSchema: StructType = StructType(Seq(
+    StructField(name = "edges", dataType = arrayType),
+    StructField(name = "buffered", dataType = BooleanType)
+  ))
+
+  override def dataType: DataType = arrayType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, scala.collection.mutable.ListBuffer.empty[Element])
+  }
+
+  /* not optimized */
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val element = input
+
+    val prev = buffer.getAs[Seq[Row]](0)
+    val appended = prev :+ element
+
+    buffer.update(0, appended)
+    buffer.update(1, false)
+  }
+
+  private def takeTopK(ls: Seq[Row], k: Int) = {
+    val sorted = ls.sorted
+    if (sorted.size <= k) sorted else sorted.take(k)
+  }
+  /* not optimized */
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    val cur = buffer2.getAs[Seq[Row]](0)
+    val prev = buffer1.getAs[Seq[Row]](0)
+
+    buffer1.update(0, takeTopK(prev ++ cur, maxNumOfEdges))
+    buffer1.update(1, true)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    val ls = buffer.getAs[Seq[Row]](0)
+    val buffered = buffer.getAs[Boolean](1)
+    if (buffered) ls
+    else takeTopK(ls, maxNumOfEdges)
+  }
+}
+
+class GroupByArrayAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction {
+  import WalLogUDAF._
+
+  implicit val ord = rowOrdering
+
+  import scala.collection.mutable
+
+  override def inputSchema: StructType = StructType(Seq(
+    StructField(name = "edges", dataType = arrayType)
+  ))
+
+  override def bufferSchema: StructType = StructType(Seq(
+    StructField(name = "edges", dataType = arrayType)
+  ))
+
+  override def dataType: DataType = arrayType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit =
+    buffer.update(0, mutable.ListBuffer.empty[Row])
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val cur = input.getAs[Seq[Row]](0)
+    val prev = buffer.getAs[Seq[Row]](0)
+    val merged = mergeTwoSeq(cur, prev, maxNumOfEdges)
+
+    buffer.update(0, merged)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    val cur = buffer2.getAs[Seq[Row]](0)
+    val prev = buffer1.getAs[Seq[Row]](0)
+
+    val merged = mergeTwoSeq(cur, prev, maxNumOfEdges)
+    buffer1.update(0, merged)
+  }
+
+  override def evaluate(buffer: Row): Any = buffer.getAs[Seq[Row]](0)
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala
new file mode 100644
index 0000000..d213d6c
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala
@@ -0,0 +1,210 @@
+package org.apache.s2graph.s2jobs.wal.udfs
+
+import com.google.common.hash.Hashing
+import org.apache.s2graph.core.JSONParser
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.{Dataset, Row}
+import play.api.libs.json._
+
+import scala.reflect.ClassTag
+
+object WalLogUDF {
+
+  import scala.collection.mutable
+
+  type MergedProps = Map[String, Seq[String]]
+  type MutableMergedProps = mutable.Map[String, mutable.Map[String, Int]]
+  type MutableMergedPropsInner = mutable.Map[String, Int]
+
+  def initMutableMergedPropsInner = mutable.Map.empty[String, Int]
+
+  def initMutableMergedProps = mutable.Map.empty[String, mutable.Map[String, Int]]
+
+//  //TODO:
+//  def toDimension(rawActivity: RawActivity, propertyKey: String): String = {
+//    //    val (ts, dst, label, _) = rawActivity
+//    //    label + "." + propertyKey
+//    propertyKey
+//  }
+//
+//  def updateMutableMergedProps(mutableMergedProps: MutableMergedProps)(dimension: String,
+//                                                                       dimensionValue: String,
+//                                                                       count: Int = 1): Unit = {
+//    val buffer = mutableMergedProps.getOrElseUpdate(dimension, initMutableMergedPropsInner)
+//    val newCount = buffer.getOrElse(dimensionValue, 0) + count
+//    buffer += (dimensionValue -> newCount)
+//  }
+//
+//  def groupByDimensionValues(rawActivity: RawActivity,
+//                             propsJson: JsObject,
+//                             mergedProps: MutableMergedProps,
+//                             toDimensionFunc: (RawActivity, String) => String,
+//                             excludePropKeys: Set[String] = Set.empty): Unit = {
+//    propsJson.fields.filter(t => !excludePropKeys(t._1)).foreach { case (propertyKey, jsValue) =>
+//      val values = jsValue match {
+//        case JsString(s) => Seq(s)
+//        case JsArray(arr) => arr.map(JSONParser.jsValueToString)
+//        case _ => Seq(jsValue.toString())
+//      }
+//      val dimension = toDimensionFunc(rawActivity, propertyKey)
+//
+//      values.foreach { value =>
+//        updateMutableMergedProps(mergedProps)(dimension, value)
+//      }
+//    }
+//  }
+//
+//  def buildMergedProps(rawActivities: Seq[RawActivity],
+//                       toDimensionFunc: (RawActivity, String) => String,
+//                       defaultTopKs: Int = 100,
+//                       dimTopKs: Map[String, Int] = Map.empty,
+//                       excludePropKeys: Set[String] = Set.empty,
+//                       dimValExtractors: Seq[Extractor] = Nil): MergedProps = {
+//    val mergedProps = initMutableMergedProps
+//
+//    rawActivities.foreach { case rawActivity@(_, _, _, rawProps) =>
+//      val propsJson = Json.parse(rawProps).as[JsObject]
+//      groupByDimensionValues(rawActivity, propsJson, mergedProps, toDimensionFunc, excludePropKeys)
+//    }
+//    // work on extra dimVals.
+//    dimValExtractors.foreach { extractor =>
+//      extractor.extract(rawActivities, mergedProps)
+//    }
+//
+//    mergedProps.map { case (key, values) =>
+//      val topK = dimTopKs.getOrElse(key, defaultTopKs)
+//
+//      key -> values.toSeq.sortBy(-_._2).take(topK).map(_._1)
+//    }.toMap
+//  }
+//
+//  def rowToRawActivity(row: Row): RawActivity = {
+//    (row.getAs[Long](0), row.getAs[String](1), row.getAs[String](2), row.getAs[String](3))
+//  }
+//
+//  def appendMergeProps(toDimensionFunc: (RawActivity, String) => String = toDimension,
+//                       defaultTopKs: Int = 100,
+//                       dimTopKs: Map[String, Int] = Map.empty,
+//                       excludePropKeys: Set[String] = Set.empty,
+//                       dimValExtractors: Seq[Extractor] = Nil,
+//                       minTs: Long = 0,
+//                       maxTs: Long = Long.MaxValue) = udf((acts: Seq[Row]) => {
+//    val rows = acts.map(rowToRawActivity).filter(act => act._1 >= minTs && act._1 < maxTs)
+//
+//    buildMergedProps(rows, toDimensionFunc, defaultTopKs, dimTopKs, excludePropKeys, dimValExtractors)
+//  })
+
+  val extractDimensionValues = {
+    udf((dimensionValues: Map[String, Seq[String]]) => {
+      dimensionValues.toSeq.flatMap { case (dimension, values) =>
+        values.map { value => dimension -> value }
+      }
+    })
+  }
+
+  def toHash(dimension: String, dimensionValue: String): Long = {
+    val key = s"$dimension.$dimensionValue"
+    Hashing.murmur3_128().hashBytes(key.toString.getBytes("UTF-8")).asLong()
+  }
+
+  def filterDimensionValues(validDimValues: Broadcast[Set[Long]]) = {
+    udf((dimensionValues: Map[String, Seq[String]]) => {
+      dimensionValues.map { case (dimension, values) =>
+        val filtered = values.filter { value =>
+          val hash = toHash(dimension, value)
+
+          validDimValues.value(hash)
+        }
+
+        dimension -> filtered
+      }
+    })
+  }
+
+  def appendRank[K1: ClassTag, K2: ClassTag, V: ClassTag](ds: Dataset[((K1, K2), V)],
+                                                          numOfPartitions: Option[Int] = None,
+                                                          samplePointsPerPartitionHint: Option[Int] = None)(implicit ordering: Ordering[(K1, K2)]) = {
+    import org.apache.spark.RangePartitioner
+    val rdd = ds.rdd
+
+    val partitioner = new RangePartitioner(numOfPartitions.getOrElse(rdd.partitions.size),
+      rdd,
+      true,
+      samplePointsPerPartitionHint = samplePointsPerPartitionHint.getOrElse(20)
+    )
+
+    val sorted = rdd.repartitionAndSortWithinPartitions(partitioner)
+
+    def rank(idx: Int, iter: Iterator[((K1, K2), V)]) = {
+      var curOffset = 1L
+      var curK1 = null.asInstanceOf[K1]
+
+      iter.map{ case ((key1, key2), value) =>
+        //        println(s">>>[$idx] curK1: $curK1, curOffset: $curOffset")
+        val newOffset = if (curK1 == key1) curOffset + 1L  else 1L
+        curOffset = newOffset
+        curK1 = key1
+        (idx, newOffset, key1, key2, value)
+      }
+    }
+
+    def getOffset(idx: Int, iter: Iterator[((K1, K2), V)]) = {
+      val buffer = mutable.Map.empty[K1, (Int, Long)]
+      if (!iter.hasNext) buffer.toIterator
+      else {
+        val ((k1, k2), v) = iter.next()
+        var prevKey1: K1 = k1
+        var size = 1L
+        iter.foreach { case ((k1, k2), v) =>
+          if (prevKey1 != k1) {
+            buffer += prevKey1 -> (idx, size)
+            prevKey1 = k1
+            size = 0L
+          }
+          size += 1L
+        }
+        if (size > 0) buffer += prevKey1 -> (idx, size)
+        buffer.iterator
+      }
+    }
+
+    val partRanks = sorted.mapPartitionsWithIndex(rank)
+    val _offsets = sorted.mapPartitionsWithIndex(getOffset)
+    val offsets = _offsets.groupBy(_._1).flatMap { case (k1, partitionWithSize) =>
+      val ls = partitionWithSize.toSeq.map(_._2).sortBy(_._1)
+      var sum = ls.head._2
+      val lss = ls.tail.map { case (partition, size) =>
+        val x = (partition, sum)
+        sum += size
+        x
+      }
+      lss.map { case (partition, offset) =>
+        (k1, partition) -> offset
+      }
+    }.collect()
+
+    println(offsets)
+
+    val offsetsBCast = ds.sparkSession.sparkContext.broadcast(offsets)
+
+    def adjust(iter: Iterator[(Int, Long, K1, K2, V)], startOffsets: Map[(K1, Int), Long]) = {
+      iter.map { case (partition, rankInPartition, key1, key2, value) =>
+        val startOffset = startOffsets.getOrElse((key1, partition), 0L)
+        val rank = startOffset + rankInPartition
+
+        (partition, rankInPartition, rank, (key1, key2), value)
+      }
+    }
+
+    val withRanks = partRanks
+      .mapPartitions { iter =>
+        val startOffsets = offsetsBCast.value.toMap
+        adjust(iter, startOffsets)
+      }.map { case (_, _, rank, (key1, key2), value) =>
+      (rank, (key1, key2), value)
+    }
+
+    withRanks
+  }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/BoundedPriorityQueue.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/BoundedPriorityQueue.scala
new file mode 100644
index 0000000..c146452
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/BoundedPriorityQueue.scala
@@ -0,0 +1,52 @@
+package org.apache.s2graph.s2jobs.wal.utils
+
+import java.util.{PriorityQueue => JPriorityQueue}
+
+import scala.collection.JavaConverters._
+import scala.collection.generic.Growable
+
+/**
+  * copied from org.apache.spark.util.BoundedPriorityQueue since it is package private.
+  * @param maxSize
+  * @param ord
+  * @tparam A
+  */
+class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
+  extends Iterable[A] with Growable[A] with Serializable {
+
+  private val underlying = new JPriorityQueue[A](maxSize, ord)
+
+  override def iterator: Iterator[A] = underlying.iterator.asScala
+
+  override def size: Int = underlying.size
+
+  override def ++=(xs: TraversableOnce[A]): this.type = {
+    xs.foreach { this += _ }
+    this
+  }
+
+  override def +=(elem: A): this.type = {
+    if (size < maxSize) {
+      underlying.offer(elem)
+    } else {
+      maybeReplaceLowest(elem)
+    }
+    this
+  }
+
+  override def +=(elem1: A, elem2: A, elems: A*): this.type = {
+    this += elem1 += elem2 ++= elems
+  }
+
+  override def clear() { underlying.clear() }
+
+  private def maybeReplaceLowest(a: A): Boolean = {
+    val head = underlying.peek()
+    if (head != null && ord.gt(a, head)) {
+      underlying.poll()
+      underlying.offer(a)
+    } else {
+      false
+    }
+  }
+}
\ No newline at end of file
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala
new file mode 100644
index 0000000..2941357
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala
@@ -0,0 +1,57 @@
+package org.apache.s2graph.s2jobs.wal.utils
+
+import java.net.{URI, URLDecoder}
+
+import scala.util.matching.Regex
+
+object UrlUtils {
+  val pattern = new Regex("""(\\x[0-9A-Fa-f]{2}){3}""")
+  val koreanPattern = new scala.util.matching.Regex("([가-힣]+[\\-_a-zA-Z 0-9]*)+|([\\-_a-zA-Z 0-9]+[가-힣]+)")
+
+
+  // url extraction functions
+  def urlDecode(url: String): (Boolean, String) = {
+    try {
+      val decoded = URLDecoder.decode(url, "UTF-8")
+      (url != decoded, decoded)
+    } catch {
+      case e: Exception => (false, url)
+    }
+  }
+
+  def hex2String(url: String): String = {
+    pattern replaceAllIn(url, m => {
+      new String(m.toString.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte), "utf-8")
+    })
+  }
+
+  def toDomains(url: String, maxDepth: Int = 3): Seq[String] = {
+    val uri = new URI(url)
+    val domain = uri.getHost
+    if (domain == null) Nil
+    else {
+      val paths = uri.getPath.split("/")
+      if (paths.isEmpty) Seq(domain)
+      else {
+        val depth = Math.min(maxDepth, paths.size)
+        (1 to depth).map { ith =>
+          domain + paths.take(ith).mkString("/")
+        }
+      }
+    }
+  }
+
+  def extract(_url: String): (String, Seq[String], Option[String]) = {
+    try {
+      val url = hex2String(_url)
+      val (encoded, decodedUrl) = urlDecode(url)
+
+      val kwdOpt = koreanPattern.findAllMatchIn(decodedUrl).toList.map(_.group(0)).headOption.map(_.replaceAll("\\s", ""))
+      val domains = toDomains(url.replaceAll(" ", ""))
+      (decodedUrl, domains, kwdOpt)
+    } catch {
+      case e: Exception => (_url, Nil, None)
+    }
+  }
+}
+
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala
index d9bbb5b..5539e43 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala
@@ -26,7 +26,7 @@
 
   test("SqlProcess execute sql") {
     import spark.implicits._
-
+    
     val inputDF = Seq(
       ("a", "b", "friend"),
       ("a", "c", "friend"),
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TestData.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TestData.scala
new file mode 100644
index 0000000..12385c0
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TestData.scala
@@ -0,0 +1,54 @@
+package org.apache.s2graph.s2jobs.wal
+
+object TestData {
+  val testServiceName = "s2graph"
+  val walLogsLs = Seq(
+    WalLog(1L, "insert", "edge", "u1", "i1", s"$testServiceName", "click", """{"item_name":"awesome item"}"""),
+    WalLog(2L, "insert", "edge", "u1", "i1", s"$testServiceName", "purchase", """{"price":2}"""),
+    WalLog(3L, "insert", "edge", "u1", "q1", s"$testServiceName", "search", """{"referrer":"www.google.com"}"""),
+    WalLog(4L, "insert", "edge", "u2", "i1", s"$testServiceName", "click", """{"item_name":"awesome item"}"""),
+    WalLog(5L, "insert", "edge", "u2", "q2", s"$testServiceName", "search", """{"referrer":"www.bing.com"}"""),
+    WalLog(6L, "insert", "edge", "u3", "i2", s"$testServiceName", "click", """{"item_name":"bad item"}"""),
+    WalLog(7L, "insert", "edge", "u4", "q1", s"$testServiceName", "search", """{"referrer":"www.google.com"}""")
+  )
+
+  // order by from
+  val aggExpected = Array(
+    WalLogAgg("u1",
+      vertices = Nil,
+      edges = Seq(
+        WalLog(3L, "insert", "edge", "u1", "q1", s"$testServiceName", "search", """{"referrer":"www.google.com"}"""),
+        WalLog(2L, "insert", "edge", "u1", "i1", s"$testServiceName", "purchase", """{"price":2}"""),
+        WalLog(1L, "insert", "edge", "u1", "i1", s"$testServiceName", "click", """{"item_name":"awesome item"}""")
+      )
+    ),
+    WalLogAgg("u2",
+      vertices = Nil,
+      edges = Seq(
+        WalLog(5L, "insert", "edge", "u2", "q2", s"$testServiceName", "search", """{"referrer":"www.bing.com"}"""),
+        WalLog(4L, "insert", "edge", "u2", "i1", s"$testServiceName", "click", """{"item_name":"awesome item"}""")
+      )
+    ),
+    WalLogAgg("u3",
+      vertices = Nil,
+      edges = Seq(
+        WalLog(6L, "insert", "edge", "u3", "i2", s"$testServiceName", "click", """{"item_name":"bad item"}""")
+      )
+    ),
+    WalLogAgg("u4",
+      vertices = Nil,
+      edges = Seq(
+        WalLog(7L, "insert", "edge", "u4", "q1", s"$testServiceName", "search", """{"referrer":"www.google.com"}""")
+      )
+    )
+  )
+
+  // order by dim, rank
+  val featureDictExpected = Array(
+    DimValCountRank(DimVal("click:item_name", "awesome item"), 2, 1),
+    DimValCountRank(DimVal("click:item_name", "bad item"), 1, 2),
+    DimValCountRank(DimVal("purchase:price", "2"), 1, 1),
+    DimValCountRank(DimVal("search:referrer", "www.google.com"), 2, 1),
+    DimValCountRank(DimVal("search:referrer", "www.bing.com"), 1, 2)
+  )
+}
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala
new file mode 100644
index 0000000..974002b
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala
@@ -0,0 +1,32 @@
+package org.apache.s2graph.s2jobs.wal
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.transformer._
+import org.scalatest.{FunSuite, Matchers}
+import play.api.libs.json.Json
+
+class TransformerTest extends FunSuite with Matchers {
+  val walLog = WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1, "url": "www.google.com"}""")
+
+  test("test default transformer") {
+    val taskConf = TaskConf.Empty
+    val transformer = new DefaultTransformer(taskConf)
+    val dimVals = transformer.toDimValLs(walLog, "name", "1")
+
+    dimVals shouldBe Seq(DimVal("friends:name", "1"))
+  }
+
+  test("test ExtractDomain from URL") {
+    val taskConf = TaskConf.Empty.copy(options =
+      Map("urlDimensions" -> Json.toJson(Seq("url")).toString())
+    )
+    val transformer = new ExtractDomain(taskConf)
+    val dimVals = transformer.toDimValLs(walLog, "url", "http://www.google.com/abc")
+
+    dimVals shouldBe Seq(
+      DimVal("host", "www.google.com"),
+      DimVal("domain", "www.google.com"),
+      DimVal("domain", "www.google.com/abc")
+    )
+  }
+}
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcessTest.scala
new file mode 100644
index 0000000..4d2f079
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcessTest.scala
@@ -0,0 +1,31 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.DimValCountRank
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+class BuildTopFeaturesProcessTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
+
+  import org.apache.s2graph.s2jobs.wal.TestData._
+
+  test("test entire process.") {
+    import spark.implicits._
+    val df = spark.createDataset(aggExpected).toDF()
+
+    val taskConf = new TaskConf(name = "test", `type` = "test", inputs = Seq("input"),
+      options = Map("minUserCount" -> "0")
+    )
+    val job = new BuildTopFeaturesProcess(taskConf)
+
+
+    val inputMap = Map("input" -> df)
+    val featureDicts = job.execute(spark, inputMap)
+      .orderBy("dim", "rank")
+      .map(DimValCountRank.fromRow)
+      .collect()
+
+    featureDicts shouldBe featureDictExpected
+
+  }
+}
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcessTest.scala
new file mode 100644
index 0000000..cd8295a
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcessTest.scala
@@ -0,0 +1,84 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.transformer.DefaultTransformer
+import org.apache.s2graph.s2jobs.wal.{DimValCountRank, WalLogAgg}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+class FilterTopFeaturesProcessTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
+  import org.apache.s2graph.s2jobs.wal.TestData._
+
+  test("test filterTopKsPerDim.") {
+    import spark.implicits._
+    val featureDf = spark.createDataset(featureDictExpected).map { x =>
+      (x.dimVal.dim, x.dimVal.value, x.count, x.rank)
+    }.toDF("dim", "value", "count", "rank")
+
+    val maxRankPerDim = spark.sparkContext.broadcast(Map.empty[String, Int])
+
+    // filter nothing because all feature has rank < 10
+    val filtered = FilterTopFeaturesProcess.filterTopKsPerDim(featureDf, maxRankPerDim, 10)
+
+    val real = filtered.orderBy("dim", "rank").map(DimValCountRank.fromRow).collect()
+    real.zip(featureDictExpected).foreach { case (real, expected) =>
+        real shouldBe expected
+    }
+    // filter rank >= 2
+    val filtered2 = FilterTopFeaturesProcess.filterTopKsPerDim(featureDf, maxRankPerDim, 2)
+    val real2 = filtered2.orderBy("dim", "rank").map(DimValCountRank.fromRow).collect()
+    real2 shouldBe featureDictExpected.filter(_.rank < 2)
+  }
+
+
+  test("test filterWalLogAgg.") {
+    import spark.implicits._
+    val walLogAgg = spark.createDataset(aggExpected)
+    val featureDf = spark.createDataset(featureDictExpected).map { x =>
+      (x.dimVal.dim, x.dimVal.value, x.count, x.rank)
+    }.toDF("dim", "value", "count", "rank")
+    val maxRankPerDim = spark.sparkContext.broadcast(Map.empty[String, Int])
+
+    val transformers = Seq(DefaultTransformer(TaskConf.Empty))
+    // filter nothing. so input, output should be same.
+    val featureFiltered = FilterTopFeaturesProcess.filterTopKsPerDim(featureDf, maxRankPerDim, 10)
+    val validFeatureHashKeys = FilterTopFeaturesProcess.collectDistinctFeatureHashes(spark, featureFiltered)
+    val validFeatureHashKeysBCast = spark.sparkContext.broadcast(validFeatureHashKeys)
+    val real = FilterTopFeaturesProcess.filterWalLogAgg(spark, walLogAgg, transformers, validFeatureHashKeysBCast)
+      .collect().sortBy(_.from)
+
+    real.zip(aggExpected).foreach { case (real, expected) =>
+      real shouldBe expected
+    }
+  }
+
+  test("test entire process. filter nothing.") {
+    import spark.implicits._
+    val df = spark.createDataset(aggExpected).toDF()
+    val featureDf = spark.createDataset(featureDictExpected).map { x =>
+      (x.dimVal.dim, x.dimVal.value, x.count, x.rank)
+    }.toDF("dim", "value", "count", "rank")
+
+    val inputKey = "input"
+    val featureDictKey = "feature"
+    // filter nothing since we did not specified maxRankPerDim and defaultMaxRank.
+    val taskConf = new TaskConf(name = "test", `type` = "test",
+      inputs = Seq(inputKey, featureDictKey),
+      options = Map(
+        "featureDict" -> featureDictKey,
+        "walLogAgg" -> inputKey
+      )
+    )
+    val inputMap = Map(inputKey -> df, featureDictKey -> featureDf)
+    val job = new FilterTopFeaturesProcess(taskConf)
+    val filtered = job.execute(spark, inputMap)
+      .orderBy("from")
+      .as[WalLogAgg]
+      .collect()
+
+    filtered.zip(aggExpected).foreach { case (real, expected) =>
+      real shouldBe expected
+    }
+
+  }
+}
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
new file mode 100644
index 0000000..1bb7426
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
@@ -0,0 +1,31 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal._
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+class WalLogAggregateProcessTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
+  import org.apache.s2graph.s2jobs.wal.TestData._
+
+  test("test entire process") {
+    import spark.sqlContext.implicits._
+
+    val edges = spark.createDataset(walLogsLs).toDF()
+    val processKey = "agg"
+    val inputMap = Map(processKey -> edges)
+
+    val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq(processKey),
+      options = Map("maxNumOfEdges" -> "10")
+    )
+
+    val job = new WalLogAggregateProcess(taskConf = taskConf)
+    val processed = job.execute(spark, inputMap)
+
+    processed.printSchema()
+    processed.orderBy("from").as[WalLogAgg].collect().zip(aggExpected).foreach { case (real, expected) =>
+      real shouldBe expected
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAFTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAFTest.scala
new file mode 100644
index 0000000..aded56d
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAFTest.scala
@@ -0,0 +1,42 @@
+package org.apache.s2graph.s2jobs.wal.udafs
+
+import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
+import org.scalatest._
+
+import scala.collection.mutable
+import scala.util.Random
+
+class WalLogUDAFTest extends FunSuite with Matchers {
+
+  test("mergeTwoSeq") {
+    val prev: Array[Int] = Array(3, 2, 1)
+    val cur: Array[Int] = Array(4, 2, 2)
+
+    val ls = WalLogUDAF.mergeTwoSeq(prev, cur, 10)
+    println(ls.size)
+
+    ls.foreach { x =>
+      println(x)
+    }
+  }
+
+  test("addToTopK test.") {
+    import WalLogUDAF._
+    val numOfTest = 100
+    val numOfNums = 100
+    val maxNum = 10
+
+    (0 until numOfTest).foreach { testNum =>
+      val maxSize = 1 + Random.nextInt(numOfNums)
+      val pq = new BoundedPriorityQueue[Int](maxSize)
+      val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum))
+      var result: mutable.Seq[Int] = mutable.ArrayBuffer.empty[Int]
+
+      arr.foreach { i =>
+        pq += i
+        result = addToTopK(result, maxSize, i)
+      }
+      result.sorted shouldBe pq.toSeq.sorted
+    }
+  }
+}