clean up WalLogAggregateProcess.
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
index dc628ec..f4a670b 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
@@ -1,7 +1,7 @@
 package org.apache.s2graph.s2jobs.wal
 
 import org.apache.s2graph.core.JSONParser
-import org.apache.s2graph.s2jobs.wal.process.AggregateParam
+import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
 import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
@@ -36,6 +36,8 @@
   }
 }
 
+case class DimVal(dim: String, value: String)
+
 case class WalLogAgg(from: String,
                      logs: Seq[WalLog],
                      maxTs: Long,
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
new file mode 100644
index 0000000..7d04334
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
@@ -0,0 +1,146 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import com.google.common.hash.Hashing
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.process.params.FeatureIndexParam
+import org.apache.s2graph.s2jobs.wal.transformer._
+import org.apache.s2graph.s2jobs.wal.udfs.WalLogUDF
+import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions._
+import play.api.libs.json.{JsObject, Json}
+
+import scala.collection.mutable
+
+object FeatureIndexProcess {
+  def extractDimValues(transformers: Seq[Transformer]) = {
+    udf((rows: Seq[Row]) => {
+      val logs = rows.map(WalLog.fromRow)
+      // TODO: this can be changed into Map to count how many times each dimVal exist in sequence of walLog
+      // then change this to mutable.Map.empty[DimVal, Int], then aggregate.
+      val distinctDimValues = mutable.Set.empty[DimVal]
+
+      logs.foreach { walLog =>
+        walLog.propsKeyValues.foreach { case (propsKey, propsValue) =>
+          transformers.foreach { transformer =>
+            transformer.toDimValLs(walLog, propsKey, propsValue).foreach(distinctDimValues += _)
+          }
+        }
+      }
+
+      distinctDimValues.toSeq
+    })
+  }
+
+  def buildDictionary(ss: SparkSession,
+                      allDimVals: DataFrame,
+                      param: FeatureIndexParam,
+                      dimValColumnName: String = "dimVal"): DataFrame = {
+    import ss.implicits._
+
+    val rawFeatures = allDimVals
+      .select(col(param._countColumnName), col(s"$dimValColumnName.dim").as("dim"), col(s"$dimValColumnName.value").as("value"))
+      .groupBy("dim", "value")
+      .agg(countDistinct(param._countColumnName).as("count"))
+      .filter(s"count > ${param._minUserCount}")
+
+    val ds: Dataset[((String, Long), String)] =
+      rawFeatures.select("dim", "value", "count").as[(String, String, Long)]
+        .map { case (dim, value, uv) =>
+          (dim, uv) -> value
+        }
+
+
+    implicit val ord = Ordering.Tuple2(Ordering.String, Ordering.Long.reverse)
+
+    val rdd: RDD[(Long, (String, Long), String)] = WalLogUDF.appendRank(ds, param.numOfPartitions, param.samplePointsPerPartitionHint)
+
+    rdd.toDF("rank", "dim_count", "value")
+      .withColumn("dim", col("dim_count._1"))
+      .withColumn("count", col("dim_count._2"))
+      .select("dim", "value", "count", "rank")
+  }
+
+  def toFeatureHash(dim: String, value: String): Long = {
+    Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong()
+  }
+
+  def collectDistinctFeatureHashes(ss: SparkSession,
+                                   filteredDict: DataFrame): Array[Long] = {
+    import ss.implicits._
+
+    val featureHashUDF = udf((dim: String, value: String) => toFeatureHash(dim, value))
+
+    filteredDict.withColumn("featureHash", featureHashUDF(col("dim"), col("value")))
+      .select("featureHash")
+      .distinct().as[Long].collect()
+  }
+
+  def filterTopKsPerDim(dict: DataFrame,
+                        maxRankPerDim: Broadcast[Map[String, Int]],
+                        defaultMaxRank: Int): DataFrame = {
+    val filterUDF = udf((dim: String, rank: Long) => {
+      rank < maxRankPerDim.value.getOrElse(dim, defaultMaxRank)
+    })
+
+    dict.filter(filterUDF(col("dim"), col("rank")))
+  }
+}
+
+case class FeatureIndexProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) {
+
+  import FeatureIndexProcess._
+
+  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = {
+    val countColumnName = taskConf.options.getOrElse("countColumnName", "from")
+    val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt)
+    val samplePointsPerPartitionHint = taskConf.options.get("samplePointsPerPartitionHint").map(_.toInt)
+    val minUserCount = taskConf.options.get("minUserCount").map(_.toLong)
+    val maxRankPerDim = taskConf.options.get("maxRankPerDim").map { s =>
+      val json = Json.parse(s).as[JsObject]
+      json.fieldSet.map { case (key, jsValue) => key -> jsValue.as[Int] }.toMap
+    }
+    val defaultMaxRank = taskConf.options.get("defaultMaxRank").map(_.toInt)
+    val dictPath = taskConf.options.get("dictPath")
+
+    numOfPartitions.map { d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d.toString) }
+
+    //    val maxRankPerDimBCast = ss.sparkContext.broadcast(maxRankPerDim.getOrElse(Map.empty))
+
+    val param = FeatureIndexParam(minUserCount = minUserCount, countColumnName = Option(countColumnName),
+      numOfPartitions = numOfPartitions, samplePointsPerPartitionHint = samplePointsPerPartitionHint,
+      maxRankPerDim = maxRankPerDim, defaultMaxRank = defaultMaxRank, dictPath = dictPath
+    )
+
+    val edges = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, cur) =>
+      prev.union(inputMap(cur))
+    }
+
+    //TODO: user expect to inject transformers that transform (WalLog, propertyKey, propertyValue) to Seq[DimVal].
+    val transformers = Seq(DefaultTransformer())
+    val dimValExtractUDF = extractDimValues(transformers)
+    val dimValColumnName = "dimVal"
+
+    val rawFeatures = edges
+      .withColumn(dimValColumnName, explode(dimValExtractUDF(col("logs"))))
+
+    val dict = buildDictionary(ss, rawFeatures, param, dimValColumnName)
+
+    dict
+    //TODO: filter topKs per dim, then build valid dimValLs.
+    // then broadcast valid dimValLs to original dataframe, and filter out not valid dimVal.
+
+    //    dictPath.foreach { path => dict.write.mode(SaveMode.Overwrite).parquet(path) }
+    //
+    //    val filteredDict = filterTopKsPerDim(dict, maxRankPerDimBCast, defaultMaxRank.getOrElse(Int.MaxValue))
+    //    val distinctFeatureHashes = collectDistinctFeatureHashes(ss, filteredDict)
+    //    val distinctFeatureHashesBCast = ss.sparkContext.broadcast(distinctFeatureHashes)
+
+    //    filteredDict
+  }
+
+
+  override def mandatoryOptions: Set[String] = Set.empty
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
index 0c9829c..2b42e20 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
@@ -1,29 +1,10 @@
 package org.apache.s2graph.s2jobs.wal.process
 
 import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
 import org.apache.s2graph.s2jobs.wal.{WalLog, WalLogAgg}
 import org.apache.spark.sql._
 
-object AggregateParam {
-  val defaultGroupByKeys = Seq("from")
-  val defaultTopK = 1000
-  val defaultIsArrayType = false
-  val defaultShouldSortTopItems = true
-}
-
-case class AggregateParam(groupByKeys: Option[Seq[String]],
-                          topK: Option[Int],
-                          isArrayType: Option[Boolean],
-                          shouldSortTopItems: Option[Boolean]) {
-
-  import AggregateParam._
-
-  val groupByColumns = groupByKeys.getOrElse(defaultGroupByKeys)
-  val heapSize = topK.getOrElse(defaultTopK)
-  val arrayType = isArrayType.getOrElse(defaultIsArrayType)
-  val sortTopItems = shouldSortTopItems.getOrElse(defaultShouldSortTopItems)
-}
-
 object WalLogAggregateProcess {
   def aggregate(ss: SparkSession,
                 dataset: Dataset[WalLogAgg],
@@ -64,8 +45,8 @@
     val maxNumOfEdges = taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000)
     val arrayType = taskConf.options.get("arrayType").map(_.toBoolean).getOrElse(false)
     val sortTopItems = taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(false)
-
-    taskConf.options.get("parallelism").foreach(d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d))
+    val numOfPartitions = taskConf.options.get("numOfPartitions")
+    numOfPartitions.foreach(d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d))
 
     implicit val ord = WalLog.orderByTsAsc
     val walLogs = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, cur) =>
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
new file mode 100644
index 0000000..89c36f4
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
@@ -0,0 +1,22 @@
+package org.apache.s2graph.s2jobs.wal.process.params
+
+
+object AggregateParam {
+  val defaultGroupByKeys = Seq("from")
+  val defaultTopK = 1000
+  val defaultIsArrayType = false
+  val defaultShouldSortTopItems = true
+}
+
+case class AggregateParam(groupByKeys: Option[Seq[String]],
+                          topK: Option[Int],
+                          isArrayType: Option[Boolean],
+                          shouldSortTopItems: Option[Boolean]) {
+
+  import AggregateParam._
+
+  val groupByColumns = groupByKeys.getOrElse(defaultGroupByKeys)
+  val heapSize = topK.getOrElse(defaultTopK)
+  val arrayType = isArrayType.getOrElse(defaultIsArrayType)
+  val sortTopItems = shouldSortTopItems.getOrElse(defaultShouldSortTopItems)
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
new file mode 100644
index 0000000..ddf7037
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
@@ -0,0 +1,20 @@
+package org.apache.s2graph.s2jobs.wal.process.params
+
+object FeatureIndexParam {
+  val defaultMinUserCount = 0L
+  val defaultCountColumnName = "from"
+}
+
+case class FeatureIndexParam(minUserCount: Option[Long],
+                             countColumnName: Option[String],
+                             samplePointsPerPartitionHint: Option[Int],
+                             numOfPartitions: Option[Int],
+                             maxRankPerDim: Option[Map[String, Int]],
+                             defaultMaxRank: Option[Int],
+                             dictPath: Option[String]) {
+  import FeatureIndexParam._
+  val _countColumnName = countColumnName.getOrElse(defaultCountColumnName)
+  val _minUserCount = minUserCount.getOrElse(defaultMinUserCount)
+  val _maxRankPerDim = maxRankPerDim.getOrElse(Map.empty)
+  val _defaultMaxRank = defaultMaxRank.getOrElse(Int.MaxValue)
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala
new file mode 100644
index 0000000..2839d6a
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala
@@ -0,0 +1,3 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+case class DefaultTransformer() extends Transformer
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala
new file mode 100644
index 0000000..9b09d71
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala
@@ -0,0 +1,20 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+import org.apache.s2graph.s2jobs.wal.utils.UrlUtils
+import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+
+case class ExtractDomain(urlDimensions: Set[String],
+                         hostDimName: String = "host",
+                         domainDimName: String = "domain",
+                         keywordDimName: String = "uri_keywords") extends Transformer {
+  override def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): Seq[DimVal] = {
+    if (!urlDimensions(propertyKey)) Nil
+    else {
+      val (_, domains, kwdOpt) = UrlUtils.extract(propertyValue)
+
+      domains.headOption.toSeq.map(DimVal(hostDimName, _)) ++
+        domains.map(DimVal(domainDimName, _)) ++
+        kwdOpt.toSeq.map(DimVal(keywordDimName, _))
+    }
+  }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala
new file mode 100644
index 0000000..7ad7dfd
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala
@@ -0,0 +1,17 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+
+class ExtractServiceName(serviceDims: Set[String],
+                         domainServiceMap: Map[String, String] = Map.empty,
+                         serviceDimName: String = "serviceName") extends Transformer {
+  override def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): Seq[DimVal] = {
+    if (!serviceDims(propertyKey)) Nil
+    else {
+      val serviceName = domainServiceMap.getOrElse(propertyValue, propertyValue)
+
+      Seq(DimVal(serviceDimName, serviceName))
+    }
+  }
+}
+
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala
new file mode 100644
index 0000000..523f58d
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala
@@ -0,0 +1,15 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+
+/**
+  * decide how to transform walLog's each property key value to Seq[DimVal]
+  */
+trait Transformer {
+  def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): Seq[DimVal] = {
+    val dim = s"${walLog.label}:${propertyKey}"
+    val value = propertyValue
+
+    Seq(DimVal(dim, value))
+  }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala
similarity index 98%
rename from s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
rename to s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala
index dabfd99..81a1356 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala
@@ -8,7 +8,7 @@
 import scala.annotation.tailrec
 import scala.collection.mutable
 
-object S2EdgeDataAggregate {
+object WalLogUDAF {
   type Element = (Long, String, String, String)
 
   val emptyRow = new GenericRow(Array(-1L, "empty", "empty", "empty"))
@@ -142,7 +142,7 @@
 
 class GroupByAggOptimized(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction {
 
-  import S2EdgeDataAggregate._
+  import WalLogUDAF._
 
   implicit val ord = rowOrdering
 
@@ -195,7 +195,7 @@
 }
 
 class GroupByAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction {
-  import S2EdgeDataAggregate._
+  import WalLogUDAF._
 
   implicit val ord = rowOrderingDesc
 
@@ -249,7 +249,7 @@
 }
 
 class GroupByArrayAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction {
-  import S2EdgeDataAggregate._
+  import WalLogUDAF._
 
   implicit val ord = rowOrdering
 
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala
new file mode 100644
index 0000000..34f4a2b
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala
@@ -0,0 +1,216 @@
+package org.apache.s2graph.s2jobs.wal.udfs
+
+import com.google.common.hash.Hashing
+import org.apache.s2graph.core.JSONParser
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.{Dataset, Row}
+import play.api.libs.json._
+
+import scala.reflect.ClassTag
+
+object WalLogUDF {
+
+  import scala.collection.mutable
+
+  type MergedProps = Map[String, Seq[String]]
+  type MutableMergedProps = mutable.Map[String, mutable.Map[String, Int]]
+  type MutableMergedPropsInner = mutable.Map[String, Int]
+
+  def initMutableMergedPropsInner = mutable.Map.empty[String, Int]
+
+  def initMutableMergedProps = mutable.Map.empty[String, mutable.Map[String, Int]]
+
+//  //TODO:
+//  def toDimension(rawActivity: RawActivity, propertyKey: String): String = {
+//    //    val (ts, dst, label, _) = rawActivity
+//    //    label + "." + propertyKey
+//    propertyKey
+//  }
+//
+//  def updateMutableMergedProps(mutableMergedProps: MutableMergedProps)(dimension: String,
+//                                                                       dimensionValue: String,
+//                                                                       count: Int = 1): Unit = {
+//    val buffer = mutableMergedProps.getOrElseUpdate(dimension, initMutableMergedPropsInner)
+//    val newCount = buffer.getOrElse(dimensionValue, 0) + count
+//    buffer += (dimensionValue -> newCount)
+//  }
+//
+//  def groupByDimensionValues(rawActivity: RawActivity,
+//                             propsJson: JsObject,
+//                             mergedProps: MutableMergedProps,
+//                             toDimensionFunc: (RawActivity, String) => String,
+//                             excludePropKeys: Set[String] = Set.empty): Unit = {
+//    propsJson.fields.filter(t => !excludePropKeys(t._1)).foreach { case (propertyKey, jsValue) =>
+//      val values = jsValue match {
+//        case JsString(s) => Seq(s)
+//        case JsArray(arr) => arr.map(JSONParser.jsValueToString)
+//        case _ => Seq(jsValue.toString())
+//      }
+//      val dimension = toDimensionFunc(rawActivity, propertyKey)
+//
+//      values.foreach { value =>
+//        updateMutableMergedProps(mergedProps)(dimension, value)
+//      }
+//    }
+//  }
+//
+//  def buildMergedProps(rawActivities: Seq[RawActivity],
+//                       toDimensionFunc: (RawActivity, String) => String,
+//                       defaultTopKs: Int = 100,
+//                       dimTopKs: Map[String, Int] = Map.empty,
+//                       excludePropKeys: Set[String] = Set.empty,
+//                       dimValExtractors: Seq[Extractor] = Nil): MergedProps = {
+//    val mergedProps = initMutableMergedProps
+//
+//    rawActivities.foreach { case rawActivity@(_, _, _, rawProps) =>
+//      val propsJson = Json.parse(rawProps).as[JsObject]
+//      groupByDimensionValues(rawActivity, propsJson, mergedProps, toDimensionFunc, excludePropKeys)
+//    }
+//    // work on extra dimVals.
+//    dimValExtractors.foreach { extractor =>
+//      extractor.extract(rawActivities, mergedProps)
+//    }
+//
+//    mergedProps.map { case (key, values) =>
+//      val topK = dimTopKs.getOrElse(key, defaultTopKs)
+//
+//      key -> values.toSeq.sortBy(-_._2).take(topK).map(_._1)
+//    }.toMap
+//  }
+//
+//  def rowToRawActivity(row: Row): RawActivity = {
+//    (row.getAs[Long](0), row.getAs[String](1), row.getAs[String](2), row.getAs[String](3))
+//  }
+//
+//  def appendMergeProps(toDimensionFunc: (RawActivity, String) => String = toDimension,
+//                       defaultTopKs: Int = 100,
+//                       dimTopKs: Map[String, Int] = Map.empty,
+//                       excludePropKeys: Set[String] = Set.empty,
+//                       dimValExtractors: Seq[Extractor] = Nil,
+//                       minTs: Long = 0,
+//                       maxTs: Long = Long.MaxValue) = udf((acts: Seq[Row]) => {
+//    val rows = acts.map(rowToRawActivity).filter(act => act._1 >= minTs && act._1 < maxTs)
+//
+//    buildMergedProps(rows, toDimensionFunc, defaultTopKs, dimTopKs, excludePropKeys, dimValExtractors)
+//  })
+
+  val extractDimensionValues = {
+    udf((dimensionValues: Map[String, Seq[String]]) => {
+      dimensionValues.toSeq.flatMap { case (dimension, values) =>
+        values.map { value => dimension -> value }
+      }
+    })
+  }
+
+  def toHash(dimension: String, dimensionValue: String): Long = {
+    val key = s"$dimension.$dimensionValue"
+    Hashing.murmur3_128().hashBytes(key.toString.getBytes("UTF-8")).asLong()
+  }
+
+  def filterDimensionValues(validDimValues: Broadcast[Set[Long]]) = {
+    udf((dimensionValues: Map[String, Seq[String]]) => {
+      dimensionValues.map { case (dimension, values) =>
+        val filtered = values.filter { value =>
+          val hash = toHash(dimension, value)
+
+          validDimValues.value(hash)
+        }
+
+        dimension -> filtered
+      }
+    })
+  }
+
+  def appendRank[K1: ClassTag, K2: ClassTag, V: ClassTag](ds: Dataset[((K1, K2), V)],
+                                                          numOfPartitions: Option[Int] = None,
+                                                          samplePointsPerPartitionHint: Option[Int] = None)(implicit ordering: Ordering[(K1, K2)]) = {
+    import org.apache.spark.RangePartitioner
+    val rdd = ds.rdd
+
+    val partitioner = new RangePartitioner(numOfPartitions.getOrElse(rdd.partitions.size),
+      rdd,
+      true,
+      samplePointsPerPartitionHint = samplePointsPerPartitionHint.getOrElse(20)
+    )
+
+    val sorted = rdd.repartitionAndSortWithinPartitions(partitioner)
+
+    def rank(idx: Int, iter: Iterator[((K1, K2), V)]) = {
+      val initialK1: K1 = null.asInstanceOf[K1]
+      val initialK2: K2 = null.asInstanceOf[K2]
+      val initialV: V = null.asInstanceOf[V]
+      val zero = List((1L, initialK1, initialK2, initialV))
+
+      def merge(acc: List[(Long, K1, K2, V)], x: ((K1, K2), V)) =
+        (acc.head, x) match {
+          case ((offset, prevKey1, _, _), ((curKey1: K1, curKey2: K2), curVal: V)) => {
+            val newOffset = if (prevKey1 == curKey1) offset + 1L else 1L
+            (newOffset, curKey1, curKey2, curVal) :: acc
+          }
+        }
+
+      iter.foldLeft(zero)(merge).reverse.drop(1).map { case (offset, key1, key2, value) =>
+        (idx, offset, key1, key2, value)
+      }.toIterator
+    }
+
+    def getOffset(idx: Int, iter: Iterator[((K1, K2), V)]) = {
+      val buffer = mutable.Map.empty[K1, (Int, Long)]
+      if (!iter.hasNext) buffer.toIterator
+      else {
+        val ((k1, k2), v) = iter.next()
+        var prevKey1: K1 = k1
+        var size = 1L
+        iter.foreach { case ((k1, k2), v) =>
+          if (prevKey1 != k1) {
+            buffer += prevKey1 -> (idx, size)
+            prevKey1 = k1
+            size = 0L
+          }
+          size += 1L
+        }
+        if (size > 0) buffer += prevKey1 -> (idx, size)
+        buffer.iterator
+      }
+    }
+
+    val partRanks = sorted.mapPartitionsWithIndex(rank)
+    val _offsets = sorted.mapPartitionsWithIndex(getOffset)
+    val offsets = _offsets.groupBy(_._1).flatMap { case (k1, partitionWithSize) =>
+      val ls = partitionWithSize.toSeq.map(_._2).sortBy(_._1)
+      var sum = ls.head._2
+      val lss = ls.tail.map { case (partition, size) =>
+        val x = (partition, sum)
+        sum += size
+        x
+      }
+      lss.map { case (partition, offset) =>
+        (k1, partition) -> offset
+      }
+    }.collect()
+
+    println(offsets)
+
+    val offsetsBCast = ds.sparkSession.sparkContext.broadcast(offsets)
+
+    def adjust(iter: Iterator[(Int, Long, K1, K2, V)], startOffsets: Map[(K1, Int), Long]) = {
+      iter.map { case (partition, rankInPartition, key1, key2, value) =>
+        val startOffset = startOffsets.getOrElse((key1, partition), 0L)
+        val rank = startOffset + rankInPartition
+
+        (partition, rankInPartition, rank, (key1, key2), value)
+      }
+    }
+
+    val withRanks = partRanks
+      .mapPartitions { iter =>
+        val startOffsets = offsetsBCast.value.toMap
+        adjust(iter, startOffsets)
+      }.map { case (_, _, rank, (key1, key2), value) =>
+      (rank, (key1, key2), value)
+    }
+
+    withRanks
+  }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala
new file mode 100644
index 0000000..2941357
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala
@@ -0,0 +1,57 @@
+package org.apache.s2graph.s2jobs.wal.utils
+
+import java.net.{URI, URLDecoder}
+
+import scala.util.matching.Regex
+
+object UrlUtils {
+  val pattern = new Regex("""(\\x[0-9A-Fa-f]{2}){3}""")
+  val koreanPattern = new scala.util.matching.Regex("([가-힣]+[\\-_a-zA-Z 0-9]*)+|([\\-_a-zA-Z 0-9]+[가-힣]+)")
+
+
+  // url extraction functions
+  def urlDecode(url: String): (Boolean, String) = {
+    try {
+      val decoded = URLDecoder.decode(url, "UTF-8")
+      (url != decoded, decoded)
+    } catch {
+      case e: Exception => (false, url)
+    }
+  }
+
+  def hex2String(url: String): String = {
+    pattern replaceAllIn(url, m => {
+      new String(m.toString.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte), "utf-8")
+    })
+  }
+
+  def toDomains(url: String, maxDepth: Int = 3): Seq[String] = {
+    val uri = new URI(url)
+    val domain = uri.getHost
+    if (domain == null) Nil
+    else {
+      val paths = uri.getPath.split("/")
+      if (paths.isEmpty) Seq(domain)
+      else {
+        val depth = Math.min(maxDepth, paths.size)
+        (1 to depth).map { ith =>
+          domain + paths.take(ith).mkString("/")
+        }
+      }
+    }
+  }
+
+  def extract(_url: String): (String, Seq[String], Option[String]) = {
+    try {
+      val url = hex2String(_url)
+      val (encoded, decodedUrl) = urlDecode(url)
+
+      val kwdOpt = koreanPattern.findAllMatchIn(decodedUrl).toList.map(_.group(0)).headOption.map(_.replaceAll("\\s", ""))
+      val domains = toDomains(url.replaceAll(" ", ""))
+      (decodedUrl, domains, kwdOpt)
+    } catch {
+      case e: Exception => (_url, Nil, None)
+    }
+  }
+}
+
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala
new file mode 100644
index 0000000..5089390
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala
@@ -0,0 +1,26 @@
+package org.apache.s2graph.s2jobs.wal
+
+import org.apache.s2graph.s2jobs.wal.transformer._
+import org.scalatest.{FunSuite, Matchers}
+
+class TransformerTest extends FunSuite with Matchers {
+  val walLog = WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1, "url": "www.google.com"}""")
+
+  test("test default transformer") {
+    val transformer = new DefaultTransformer
+    val dimVals = transformer.toDimValLs(walLog, "name", "1")
+
+    dimVals shouldBe Seq(DimVal("friends:name", "1"))
+  }
+
+  test("test ExtractDomain from URL") {
+    val transformer = new ExtractDomain(urlDimensions = Set("url"))
+    val dimVals = transformer.toDimValLs(walLog, "url", "http://www.google.com/abc")
+
+    dimVals shouldBe Seq(
+      DimVal("host", "www.google.com"),
+      DimVal("domain", "www.google.com"),
+      DimVal("domain", "www.google.com/abc")
+    )
+  }
+}
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
index 5ae595b..5753411 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
@@ -31,9 +31,8 @@
     val edges = spark.createDataset(walLogsLs).toDF()
     val inputMap = Map("edges" -> edges)
     val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"),
-      options = Map("maxNumOfEdges" -> "10",
-        "runOrderBy" -> "false",
-        "groupByAggClassName" -> "GroupByAggOptimized"))
+      options = Map("maxNumOfEdges" -> "10")
+    )
 
     val job = new WalLogAggregateProcess(taskConf = taskConf)
     val processed = job.execute(spark, inputMap)
@@ -48,7 +47,7 @@
     val prev: Array[Int] = Array(3, 2, 1)
     val cur: Array[Int] = Array(4, 2, 2)
 
-    val ls = S2EdgeDataAggregate.mergeTwoSeq(prev, cur, 10)
+    val ls = WalLogUDAF.mergeTwoSeq(prev, cur, 10)
     println(ls.size)
 
     ls.foreach { x =>
@@ -57,7 +56,7 @@
   }
 
   test("addToTopK test.") {
-    import S2EdgeDataAggregate._
+    import WalLogUDAF._
     val numOfTest = 100
     val numOfNums = 100
     val maxNum = 10