Merge branch 'master' into S2GRAPH-203

* master:
  [S2GRAPH-219] Added query that includes all vertices and associated edges for GraphVisualize.
  [S2GRAPH-218] add operations not supported on sql
  [S2GRAPH-216] Provide a transform directive in the GraphQL query result
  Extract Where and EdgeTransformer to TraversalHelper.
  make middleware for GraphFormatWriter
  add extension for SigmaJs
  add continuous trigger option
  add partition options for source df
  add dataframe cache option
  support multiple partitions
  add operations not supported on sql
  add simple test
  implement @trasform directive
diff --git a/CHANGES b/CHANGES
index b9bf81b..cded3e5 100644
--- a/CHANGES
+++ b/CHANGES
@@ -37,6 +37,7 @@
     * [S2GRAPH-197] - Provide S2graphSink for non-streaming dataset
     * [S2GRAPH-205] - too many initialize S2Graph when writeBatchMutate on S2GraphSink
     * [S2GRAPH-201] - Provide S2GraphSource
+    * [S2GRAPH-218] - add operations not supported on sql
 
 ** Bug
     * [S2GRAPH-159] - Wrong syntax at a bash script under Linux
@@ -55,6 +56,7 @@
     * [S2GRAPH-192] - could not find service column when creating the label
     * [S2GRAPH-195] - could not create indices using S2GraphQL
     * [S2GRAPH-196] - Apply Query Parameter to Label Fetch in S2GraphQL
+    * [S2GRAPH-220] - Filter clause is not working on AnnoyModelFetcher
 
 ** Improvement
     * [S2GRAPH-72] - Support Apache TinkerPop and Gremlin
@@ -78,6 +80,7 @@
     * [S2GRAPH-210] - Rename package `mysqls` to `schema`
     * [S2GRAPH-213] - Abstract Query/Mutation from Storage.
     * [S2GRAPH-214] - Add REAME for movielens examples
+    * [S2GRAPH-216] - Provide a transform directive in the GraphQL query result.
 
 ** New Feature
     * [S2GRAPH-123] - Support different index on out/in direction.
@@ -90,6 +93,7 @@
     * [S2GRAPH-183] - Provide batch job to dump data stored in HBase into file.
     * [S2GRAPH-206] - Generalize machine learning model serving.
     * [S2GRAPH-215] - Implement a Storage Backend for JDBC driver, such as H2, MySql using the Mutator and Fetcher interfaces
+    * [S2GRAPH-219] - Added query that includes all vertices and associated edges for GraphVisualize. 
 
 ** Task
     * [S2GRAPH-162] - Update year in the NOTICE file.
diff --git a/build.sbt b/build.sbt
index 84d17d5..de7c02f 100755
--- a/build.sbt
+++ b/build.sbt
@@ -25,9 +25,9 @@
 
 lazy val commonSettings = Seq(
   organization := "org.apache.s2graph",
-  scalaVersion := "2.11.7",
+  scalaVersion := "2.11.8",
   isSnapshot := version.value.endsWith("-SNAPSHOT"),
-  scalacOptions := Seq("-language:postfixOps", "-unchecked", "-deprecation", "-feature", "-Xlint", "-Xlint:-missing-interpolator"),
+  scalacOptions := Seq("-language:postfixOps", "-unchecked", "-deprecation", "-feature", "-Xlint", "-Xlint:-missing-interpolator", "-target:jvm-1.8"),
   javaOptions ++= collection.JavaConversions.propertiesAsScalaMap(System.getProperties).map { case (key, value) => "-D" + key + "=" + value }.toSeq,
   testOptions in Test += Tests.Argument("-oDF"),
   concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
diff --git a/s2core/build.sbt b/s2core/build.sbt
index cfc32d6..229bf41 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -47,7 +47,7 @@
   "org.apache.tinkerpop" % "gremlin-test" % tinkerpopVersion % "test",
   "org.scalatest" %% "scalatest" % "2.2.4" % "test",
   "org.specs2" %% "specs2-core" % specs2Version % "test",
-  "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion ,
+  "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion,
   "org.apache.lucene" % "lucene-core" % "6.6.0",
   "org.apache.lucene" % "lucene-queryparser" % "6.6.0",
   "org.rocksdb" % "rocksdbjni" % rocksVersion,
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index 0a4a49b..ba18e8d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -22,6 +22,7 @@
 import java.util
 
 import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}
+import org.apache.s2graph.core.parsers.WhereParser
 import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.types.{HBaseType, InnerVal, LabelWithDirection, VertexId}
 import org.apache.s2graph.core.utils.{Extensions, logger}
@@ -39,11 +40,11 @@
     else randomInt(sampleNumber, range, set + Random.nextInt(range))
   }
 
-  def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = {
+  def sample(edges: Seq[EdgeWithScore], offset: Int, n: Int): Seq[EdgeWithScore] = {
     if (edges.size <= n) {
       edges
     } else {
-      val plainEdges = if (queryRequest.queryParam.offset == 0) {
+      val plainEdges = if (offset == 0) {
         edges.tail
       } else edges
 
@@ -141,6 +142,41 @@
 
     (hashKey, filterHashKey)
   }
+
+  def edgeToEdgeWithScore(queryRequest: QueryRequest,
+                          edge: S2EdgeLike,
+                          parentEdges: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+    val prevScore = queryRequest.prevStepScore
+    val queryParam = queryRequest.queryParam
+    val queryOption = queryRequest.query.queryOption
+    val nextStepOpt = queryRequest.nextStepOpt
+    val labelWeight = queryRequest.labelWeight
+    val where = queryParam.where.get
+    val isDefaultTransformer = queryParam.edgeTransformer.isDefault
+
+    if (where != WhereParser.success && !where.filter(edge)) Nil
+    else {
+      val edges = if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
+      edges.map { e =>
+        if (!queryOption.ignorePrevStepCache) {
+          EdgeWithScore(e, queryParam.rank.score(edge), queryParam.label)
+        } else {
+          val edgeScore = queryParam.rank.score(edge)
+          val score = queryParam.scorePropagateOp match {
+            case "plus" => edgeScore + prevScore
+            case "divide" =>
+              if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
+              else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
+            case _ => edgeScore * prevScore
+          }
+          val tsVal = processTimeDecay(queryParam, edge)
+
+          EdgeWithScore(e.copyParentEdges(parentEdges), score = score * labelWeight * tsVal, label = queryParam.label)
+        }
+      }
+    }
+  }
+
 }
 
 
@@ -481,7 +517,7 @@
       val degreeScore = 0.0
 
       val sampled =
-        if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
+        if (queryRequest.queryParam.sample >= 0) sample(edgeWithScores, queryParam.offset, queryParam.sample)
         else edgeWithScores
 
       val withScores = for {
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
index cdf3b71..6c00ca0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
@@ -29,74 +29,23 @@
 
 object AnnoyModelFetcher {
   val IndexFilePathKey = "annoyIndexFilePath"
-//  val DictFilePathKey = "annoyDictFilePath"
   val DimensionKey = "annoyIndexDimension"
   val IndexTypeKey = "annoyIndexType"
 
-  //  def loadDictFromLocal(file: File): Map[Int, String] = {
-  //    val files = if (file.isDirectory) {
-  //      file.listFiles()
-  //    } else {
-  //      Array(file)
-  //    }
-  //
-  //    files.flatMap { file =>
-  //      Source.fromFile(file).getLines().zipWithIndex.flatMap { case (line, _idx) =>
-  //        val tokens = line.stripMargin.split(",")
-  //        try {
-  //          val tpl = if (tokens.length < 2) {
-  //            (tokens.head.toInt, tokens.head)
-  //          } else {
-  //            (tokens.head.toInt, tokens.tail.head)
-  //          }
-  //          Seq(tpl)
-  //        } catch {
-  //          case e: Exception => Nil
-  //        }
-  //      }
-  //    }.toMap
-  //  }
-
   def buildAnnoy4s[T](indexPath: String)(implicit converter: KeyConverter[T]): Annoy[T] = {
     Annoy.load[T](indexPath)
   }
-
-  //  def buildIndex(indexPath: String,
-  //                 dictPath: String,
-  //                 dimension: Int,
-  //                 indexType: IndexType): ANNIndexWithDict = {
-  //    val dict = loadDictFromLocal(new File(dictPath))
-  //    val index = new ANNIndex(dimension, indexPath, indexType)
-  //
-  //    ANNIndexWithDict(index, dict)
-  //  }
-  //
-  //  def buildIndex(config: Config): ANNIndexWithDict = {
-  //    val indexPath = config.getString(IndexFilePathKey)
-  //    val dictPath = config.getString(DictFilePathKey)
-  //
-  //    val dimension = config.getInt(DimensionKey)
-  //    val indexType = Try { config.getString(IndexTypeKey) }.toOption.map(IndexType.valueOf).getOrElse(IndexType.ANGULAR)
-  //
-  //    buildIndex(indexPath, dictPath, dimension, indexType)
-  //  }
 }
 
-//
-//case class ANNIndexWithDict(index: ANNIndex, dict: Map[Int, String]) {
-//  val dictRev = dict.map(kv => kv._2 -> kv._1)
-//}
-
 class AnnoyModelFetcher(val graph: S2GraphLike) extends EdgeFetcher {
   import AnnoyModelFetcher._
+  import TraversalHelper._
 
   val builder = graph.elementBuilder
 
-  //  var model: ANNIndexWithDict = _
   var model: Annoy[String] = _
 
   override def init(config: Config)(implicit ec: ExecutionContext): Unit = {
-
     model = AnnoyModelFetcher.buildAnnoy4s(config.getString(IndexFilePathKey))
   }
 
@@ -106,15 +55,17 @@
     val stepResultLs = queryRequests.map { queryRequest =>
       val vertex = queryRequest.vertex
       val queryParam = queryRequest.queryParam
+      val shouldBuildParents = queryRequest.query.queryOption.returnTree || queryParam.whereHasParent
+      val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
 
-      val edgeWithScores = model.query(vertex.innerId.toIdString(), queryParam.limit).getOrElse(Nil).map { case (tgtId, score) =>
+      val edgeWithScores = model.query(vertex.innerId.toIdString(), queryParam.limit).getOrElse(Nil).flatMap { case (tgtId, score) =>
         val tgtVertexId = builder.newVertexId(queryParam.label.service,
           queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), tgtId)
 
         val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty
         val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props)
 
-        EdgeWithScore(edge, score, queryParam.label)
+        edgeToEdgeWithScore(queryRequest, edge, parentEdges)
       }
 
       StepResult(edgeWithScores, Nil, Nil)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
index 433b4dc..a07ed07 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
@@ -28,6 +28,7 @@
 
 
 class FastTextFetcher(val graph: S2GraphLike) extends EdgeFetcher {
+  import org.apache.s2graph.core.TraversalHelper._
   val builder = graph.elementBuilder
   var fastText: FastText = _
 
@@ -49,16 +50,19 @@
     val stepResultLs = queryRequests.map { queryRequest =>
       val vertex = queryRequest.vertex
       val queryParam = queryRequest.queryParam
+      val shouldBuildParents = queryRequest.query.queryOption.returnTree || queryParam.whereHasParent
+      val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
+
       val line = fastText.getLine(vertex.innerId.toIdString())
 
-      val edgeWithScores = fastText.predict(line, queryParam.limit).map { case (_label, score) =>
+      val edgeWithScores = fastText.predict(line, queryParam.limit).flatMap { case (_label, score) =>
         val tgtVertexId = builder.newVertexId(queryParam.label.service,
           queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), _label)
 
         val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty
         val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props)
 
-        EdgeWithScore(edge, score, queryParam.label)
+        edgeToEdgeWithScore(queryRequest, edge, parentEdges)
       }
 
       StepResult(edgeWithScores, Nil, Nil)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala
index aa05a31..83fab98 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala
@@ -63,6 +63,7 @@
   import InceptionFetcher._
 
   import scala.collection.JavaConverters._
+  import org.apache.s2graph.core.TraversalHelper._
   val builder = graph.elementBuilder
 
   var graphDef: Array[Byte] = _
@@ -81,16 +82,19 @@
     val stepResultLs = queryRequests.map { queryRequest =>
       val vertex = queryRequest.vertex
       val queryParam = queryRequest.queryParam
+      val shouldBuildParents = queryRequest.query.queryOption.returnTree || queryParam.whereHasParent
+      val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
+
       val urlText = vertex.innerId.toIdString()
 
-      val edgeWithScores = predict(graphDef, labels)(getImageBytes(urlText), queryParam.limit).map { case (label, score) =>
+      val edgeWithScores = predict(graphDef, labels)(getImageBytes(urlText), queryParam.limit).flatMap { case (label, score) =>
         val tgtVertexId = builder.newVertexId(queryParam.label.service,
           queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), label)
 
         val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty
         val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props)
 
-        EdgeWithScore(edge, score, queryParam.label)
+        edgeToEdgeWithScore(queryRequest, edge, parentEdges)
       }
 
       StepResult(edgeWithScores, Nil, Nil)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
index 2b02bdd..9abd80d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
@@ -27,6 +27,8 @@
 import org.apache.s2graph.core.utils.logger
 
 class StorageIO(val graph: S2GraphLike, val serDe: StorageSerDe) {
+  import TraversalHelper._
+
   val dummyCursor: Array[Byte] = Array.empty
 
   /** Parsing Logic: parse from kv from Storage into Edge */
@@ -86,17 +88,15 @@
                                startOffset: Int = 0,
                                len: Int = Int.MaxValue): StepResult = {
 
+
     val toSKeyValue = implicitly[CanSKeyValue[K]].toSKeyValue _
 
     if (kvs.isEmpty) StepResult.Empty.copy(cursors = Seq(dummyCursor))
     else {
       val queryOption = queryRequest.query.queryOption
       val queryParam = queryRequest.queryParam
-      val labelWeight = queryRequest.labelWeight
-      val nextStepOpt = queryRequest.nextStepOpt
-      val where = queryParam.where.get
       val label = queryParam.label
-      val isDefaultTransformer = queryParam.edgeTransformer.isDefault
+
       val first = kvs.head
       val kv = first
       val schemaVer = queryParam.label.schemaVersion
@@ -114,41 +114,19 @@
 
       val lastCursor: Seq[Array[Byte]] = Seq(if (keyValues.nonEmpty) toSKeyValue(keyValues(keyValues.length - 1)).row else dummyCursor)
 
+      val edgeWithScores = for {
+        (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len
+        edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq
+        edgeWithScore <- edgeToEdgeWithScore(queryRequest, edge, parentEdges)
+      } yield {
+        edgeWithScore
+      }
+
       if (!queryOption.ignorePrevStepCache) {
-        val edgeWithScores = for {
-          (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len
-          edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq
-          if where == WhereParser.success || where.filter(edge)
-          convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
-        } yield {
-          val score = queryParam.rank.score(edge)
-          EdgeWithScore(convertedEdge, score, label)
-        }
         StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor)
       } else {
-        val degreeScore = 0.0
-
-        val edgeWithScores = for {
-          (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len
-          edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq
-          if where == WhereParser.success || where.filter(edge)
-          convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
-        } yield {
-          val edgeScore = queryParam.rank.score(edge)
-          val score = queryParam.scorePropagateOp match {
-            case "plus" => edgeScore + prevScore
-            case "divide" =>
-              if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
-              else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
-            case _ => edgeScore * prevScore
-          }
-          val tsVal = processTimeDecay(queryParam, edge)
-          val newScore = degreeScore + score
-          EdgeWithScore(convertedEdge.copyParentEdges(parentEdges), score = newScore * labelWeight * tsVal, label = label)
-        }
-
         val sampled =
-          if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
+          if (queryRequest.queryParam.sample >= 0) sample(edgeWithScores, queryParam.offset, queryParam.sample)
           else edgeWithScores
 
         val normalized = if (queryParam.shouldNormalize) normalize(sampled) else sampled
diff --git a/s2graphql/build.sbt b/s2graphql/build.sbt
index a3fbec0..bc71f98 100644
--- a/s2graphql/build.sbt
+++ b/s2graphql/build.sbt
@@ -26,13 +26,15 @@
 scalacOptions ++= Seq("-deprecation", "-feature")
 
 libraryDependencies ++= Seq(
+  "org.scala-lang" % "scala-compiler" % scalaVersion.value,
+  "org.scala-lang" % "scala-reflect" % scalaVersion.value,
+
   "org.sangria-graphql" %% "sangria" % "1.4.0",
   "org.sangria-graphql" %% "sangria-spray-json" % "1.0.0",
   "org.sangria-graphql" %% "sangria-play-json" % "1.0.1" % Test,
 
   "com.typesafe.akka" %% "akka-http" % "10.0.10",
   "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.10",
-
   "com.typesafe.akka" %% "akka-slf4j" % "2.4.6",
 
   "org.scalatest" %% "scalatest" % "3.0.4" % Test
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
index f71e16f..5f1c225 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
@@ -26,6 +26,7 @@
 import akka.http.scaladsl.server.Directives._
 import akka.http.scaladsl.server._
 import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.graphql.middleware.{GraphFormatted, Transform}
 import org.apache.s2graph.core.S2Graph
 import org.apache.s2graph.core.utils.SafeUpdateCache
 import org.apache.s2graph.graphql.repository.GraphRepository
@@ -109,6 +110,8 @@
     logger.info("Schema Evicted")
   }
 
+  val TransformMiddleWare = List(org.apache.s2graph.graphql.middleware.Transform())
+
   def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = {
     import GraphRepository._
 
@@ -116,13 +119,17 @@
     val s2schema = schemaCache.withCache(cacheKey, broadcast = false, onEvict = onEvictSchema)(createNewSchema())
     val resolver: DeferredResolver[GraphRepository] = DeferredResolver.fetchers(vertexFetcher, edgeFetcher)
 
+    val includeGrpaph = vars.fields.get("includeGraph").contains(spray.json.JsBoolean(true))
+    val middleWares = if (includeGrpaph) GraphFormatted :: TransformMiddleWare else TransformMiddleWare
+
     Executor.execute(
       s2schema,
       query,
       s2Repository,
       variables = vars,
       operationName = op,
-      deferredResolver = resolver
+      deferredResolver = resolver,
+      middleware = middleWares
     ).map((res: spray.json.JsValue) => OK -> res)
       .recover {
         case error: QueryAnalysisError =>
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
index 1620f30..755d185 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
@@ -80,8 +80,8 @@
           val fields = body.asJsObject.fields
 
           val query = fields.get("query").map(js => js.convertTo[String])
-          val operationName = fields.get("operationName").filter(_ == null).map(_.convertTo[String])
-          val variables = fields.get("variables").filter(_ == null)
+          val operationName = fields.get("operationName").filterNot(_ == null).map(_.convertTo[String])
+          val variables = fields.get("variables").filterNot(_ == null)
 
           query.map(QueryParser.parse(_)) match {
             case None ⇒ complete(StatusCodes.BadRequest -> GraphQLServer.formatError("No query to execute"))
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/GraphFormatWriter.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/GraphFormatWriter.scala
new file mode 100644
index 0000000..91fea62
--- /dev/null
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/GraphFormatWriter.scala
@@ -0,0 +1,101 @@
+package org.apache.s2graph.graphql.middleware
+
+import org.apache.s2graph.core.schema.ServiceColumn
+import org.apache.s2graph.core.{GraphElement, S2EdgeLike, S2VertexLike}
+import org.apache.s2graph.graphql.types.PlayJsonScalarType
+import org.slf4s.LoggerFactory
+import play.api.libs.json._
+import sangria.execution._
+import sangria.schema.Context
+
+
+object GraphFormatted extends Middleware[Any] with MiddlewareAfterField[Any] with MiddlewareExtension[Any] {
+  implicit val logger = LoggerFactory.getLogger(this.getClass)
+
+  type QueryVal = java.util.concurrent.ConcurrentHashMap[GraphElement, Unit]
+
+  type FieldVal = Long
+
+
+  def beforeQuery(context: MiddlewareQueryContext[Any, _, _]) = {
+    new java.util.concurrent.ConcurrentHashMap[GraphElement, Unit]()
+  }
+
+  def afterQuery(queryVal: QueryVal, context: MiddlewareQueryContext[Any, _, _]) = ()
+
+  def toVertexId(v: S2VertexLike, c: ServiceColumn): String = {
+    val innerId = v.innerId.toIdString()
+
+    s"${c.service.serviceName}.${c.columnName}.${innerId}"
+  }
+
+  def toVertexJson(v: S2VertexLike, c: ServiceColumn): JsValue = {
+    Json.obj(
+      "id" -> toVertexId(v, c),
+      "label" -> v.innerId.toIdString()
+    )
+  }
+
+  def toEdgeJson(e: S2EdgeLike): JsValue = {
+    Json.obj(
+      "source" -> toVertexId(e.srcVertex, e.innerLabel.srcColumn),
+      "target" -> toVertexId(e.tgtVertex, e.innerLabel.tgtColumn),
+      "id" -> s"${toVertexId(e.srcVertex, e.innerLabel.srcColumn)}.${e.label()}.${toVertexId(e.tgtVertex, e.innerLabel.tgtColumn)}",
+      "label" -> e.label()
+    )
+  }
+
+  def afterQueryExtensions(queryVal: QueryVal,
+                           context: MiddlewareQueryContext[Any, _, _]
+                          ): Vector[Extension[_]] = {
+
+    import scala.collection.JavaConverters._
+    val elements = queryVal.keys().asScala.toVector
+
+    val edges = elements.collect { case e: S2EdgeLike => e }
+    val vertices = elements.collect { case v: S2VertexLike => v -> v.serviceColumn }
+    val verticesFromEdges = edges.flatMap { e =>
+      val label = e.innerLabel
+      Vector((e.srcVertex, label.srcColumn), (e.tgtVertex, label.srcColumn))
+    }
+
+    val verticesJson = (vertices ++ verticesFromEdges).map { case (v, c) => toVertexJson(v, c) }.distinct
+    val edgeJson = edges.map(toEdgeJson).distinct
+
+    val jsElements = Json.obj(
+      "nodes" -> verticesJson,
+      "edges" -> edgeJson
+    )
+
+    val graph = Json.obj("graph" -> jsElements)
+
+    /**
+      * nodes: [{id, label, x, y, size}, ..],
+      * edges: [{id, source, target, label}]
+      */
+    implicit val iu = PlayJsonScalarType.PlayJsonInputUnmarshaller
+    Vector(Extension[JsValue](graph))
+  }
+
+  def beforeField(queryVal: QueryVal, mctx: MiddlewareQueryContext[Any, _, _], ctx: Context[Any, _]) = {
+    continue(System.currentTimeMillis())
+  }
+
+  def afterField(queryVal: QueryVal, fieldVal: FieldVal, value: Any, mctx: MiddlewareQueryContext[Any, _, _], ctx: Context[Any, _]) = {
+    //    logger.info(s"${ctx.parentType.name}.${ctx.field.name} = ${value.getClass.getName}")
+
+    value match {
+      case ls: Seq[_] => ls.foreach {
+        case e: GraphElement => queryVal.put(e, ())
+        case _ =>
+      }
+      case e: GraphElement => queryVal.put(e, ())
+      case _ =>
+    }
+
+    None
+  }
+}
+
+
+
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/Transform.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/Transform.scala
new file mode 100644
index 0000000..9aa86fb
--- /dev/null
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/Transform.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.graphql.middleware
+
+import org.apache.s2graph.graphql.types.S2Directive
+import sangria.ast.StringValue
+import sangria.execution.{Middleware, MiddlewareAfterField, MiddlewareQueryContext}
+import sangria.schema.Context
+
+case class Transform() extends Middleware[Any] with MiddlewareAfterField[Any] {
+  type QueryVal = Unit
+  type FieldVal = Unit
+
+  def beforeQuery(context: MiddlewareQueryContext[Any, _, _]) = ()
+
+  def afterQuery(queryVal: QueryVal, context: MiddlewareQueryContext[Any, _, _]) = ()
+
+  def beforeField(cache: QueryVal, mctx: MiddlewareQueryContext[Any, _, _], ctx: Context[Any, _]) = continue
+
+  def afterField(cache: QueryVal, fromCache: FieldVal, value: Any, mctx: MiddlewareQueryContext[Any, _, _], ctx: Context[Any, _]) = {
+
+    value match {
+      case s: String => {
+        val transformFuncOpt = ctx.astFields.headOption.flatMap(_.directives.find(_.name == S2Directive.Transform.name)).map { directive =>
+          directive.arguments.head.value.asInstanceOf[StringValue].value
+        }
+
+        transformFuncOpt.map(funcString => S2Directive.resolveTransform(funcString, s)).orElse(Option(s))
+      }
+
+      case _ ⇒ None
+    }
+  }
+}
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
index 0b5a2e9..b5e65dc 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
@@ -123,9 +123,8 @@
   }
 
   def getVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = {
-    graph.asInstanceOf[S2Graph].searchVertices(queryParam).map { a =>
-      println(a)
-      a
+    graph.asInstanceOf[S2Graph].searchVertices(queryParam).map { v =>
+      v
     }
   }
 
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
index 478517f..a2b80da 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
@@ -28,7 +28,6 @@
 import sangria.schema._
 
 object FieldResolver {
-
   def graphElement[A](name: String, cType: String, c: Context[GraphRepository, Any]): A = {
     c.value match {
       case v: S2VertexLike => name match {
@@ -36,6 +35,7 @@
         case _ =>
           val innerVal = v.propertyValue(name).get
           JSONParser.innerValToAny(innerVal, cType).asInstanceOf[A]
+
       }
       case e: S2EdgeLike => name match {
         case "timestamp" => e.ts.asInstanceOf[A]
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/PlayJsonScalarType.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/PlayJsonScalarType.scala
new file mode 100644
index 0000000..5149d36
--- /dev/null
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/PlayJsonScalarType.scala
@@ -0,0 +1,112 @@
+package org.apache.s2graph.graphql.types
+
+import play.api.libs.json._
+import sangria.ast
+import sangria.execution.Executor
+import sangria.marshalling.{ArrayMapBuilder, InputUnmarshaller, ResultMarshaller, ScalarValueInfo}
+import sangria.schema._
+import sangria.validation.{BigIntCoercionViolation, IntCoercionViolation, ValueCoercionViolation}
+import sangria.macros._
+
+import scala.concurrent.ExecutionContext.Implicits.global
+
+object PlayJsonScalarType {
+
+  implicit object CustomPlayJsonResultMarshaller extends ResultMarshaller {
+    type Node = JsValue
+    type MapBuilder = ArrayMapBuilder[Node]
+
+    def emptyMapNode(keys: Seq[String]) = new ArrayMapBuilder[Node](keys)
+
+    def addMapNodeElem(builder: MapBuilder, key: String, value: Node, optional: Boolean) = builder.add(key, value)
+
+    def mapNode(builder: MapBuilder) = JsObject(builder.toMap)
+
+    def mapNode(keyValues: Seq[(String, JsValue)]) = Json.toJson(keyValues.toMap)
+
+    def arrayNode(values: Vector[JsValue]) = JsArray(values)
+
+    def optionalArrayNodeValue(value: Option[JsValue]) = value match {
+      case Some(v) ⇒ v
+      case None ⇒ nullNode
+    }
+
+    def scalarNode(value: Any, typeName: String, info: Set[ScalarValueInfo]) = value match {
+      case v: String ⇒ JsString(v)
+      case v: Boolean ⇒ JsBoolean(v)
+      case v: Int ⇒ JsNumber(v)
+      case v: Long ⇒ JsNumber(v)
+      case v: Float ⇒ JsNumber(BigDecimal(v))
+      case v: Double ⇒ JsNumber(v)
+      case v: BigInt ⇒ JsNumber(BigDecimal(v))
+      case v: BigDecimal ⇒ JsNumber(v)
+      case v: JsValue ⇒ v
+      case v ⇒ throw new IllegalArgumentException("Unsupported scalar value: " + v)
+    }
+
+    def enumNode(value: String, typeName: String) = JsString(value)
+
+    def nullNode = JsNull
+
+    def renderCompact(node: JsValue) = Json.stringify(node)
+
+    def renderPretty(node: JsValue) = Json.prettyPrint(node)
+  }
+
+  implicit object PlayJsonInputUnmarshaller extends InputUnmarshaller[JsValue] {
+    def getRootMapValue(node: JsValue, key: String) = node.asInstanceOf[JsObject].value get key
+
+    def isListNode(node: JsValue) = node.isInstanceOf[JsArray]
+
+    def getListValue(node: JsValue) = node.asInstanceOf[JsArray].value
+
+    def isMapNode(node: JsValue) = node.isInstanceOf[JsObject]
+
+    def getMapValue(node: JsValue, key: String) = node.asInstanceOf[JsObject].value get key
+
+    def getMapKeys(node: JsValue) = node.asInstanceOf[JsObject].fields.map(_._1)
+
+    def isDefined(node: JsValue) = node != JsNull
+
+    def getScalarValue(node: JsValue) = node match {
+      case JsBoolean(b) ⇒ b
+      case JsNumber(d) ⇒ d.toBigIntExact getOrElse d
+      case JsString(s) ⇒ s
+      case n ⇒ n
+    }
+
+    def getScalaScalarValue(node: JsValue) = getScalarValue(node)
+
+    def isEnumNode(node: JsValue) = node.isInstanceOf[JsString]
+
+    def isScalarNode(node: JsValue) = true
+
+    def isVariableNode(node: JsValue) = false
+
+    def getVariableName(node: JsValue) = throw new IllegalArgumentException("variables are not supported")
+
+    def render(node: JsValue) = Json.stringify(node)
+  }
+
+  case object JsonCoercionViolation extends ValueCoercionViolation("Not valid JSON")
+
+  implicit val JsonType = ScalarType[JsValue]("Json",
+    description = Some("Raw PlayJson value"),
+    coerceOutput = (value, _) ⇒ value,
+    coerceUserInput = {
+      case v: String ⇒ Right(JsString(v))
+      case v: Boolean ⇒ Right(JsBoolean(v))
+      case v: Int ⇒ Right(JsNumber(v))
+      case v: Long ⇒ Right(JsNumber(v))
+      case v: Float ⇒ Right(JsNumber(BigDecimal(v)))
+      case v: Double ⇒ Right(JsNumber(v))
+      case v: BigInt ⇒ Right(JsNumber(BigDecimal(v)))
+      case v: BigDecimal ⇒ Right(JsNumber(v))
+      case v: JsValue ⇒ Right(v)
+    },
+    coerceInput = {
+      case sv: ast.StringValue => Right(Json.parse(sv.value))
+      case _ ⇒
+        Left(JsonCoercionViolation)
+    })
+}
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Directive.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Directive.scala
new file mode 100644
index 0000000..54512c1
--- /dev/null
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Directive.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.graphql.types
+
+import sangria.schema.{Argument, Directive, DirectiveLocation, StringType}
+
+object S2Directive {
+
+  object Eval {
+
+    import scala.collection._
+
+    val codeMap = mutable.Map.empty[String, () => Any]
+
+    def compileCode(code: String): () => Any = {
+      import scala.tools.reflect.ToolBox
+      val toolbox = reflect.runtime.currentMirror.mkToolBox()
+
+      toolbox.compile(toolbox.parse(code))
+    }
+
+    def getCompiledCode[T](code: String): T = {
+      val compiledCode = Eval.codeMap.getOrElseUpdate(code, Eval.compileCode(code))
+
+      compiledCode
+        .asInstanceOf[() => Any]
+        .apply()
+        .asInstanceOf[T]
+    }
+  }
+
+  type TransformFunc = String => String
+
+  val funcArg = Argument("func", StringType)
+
+  val Transform =
+    Directive("transform",
+      arguments = List(funcArg),
+      locations = Set(DirectiveLocation.Field),
+      shouldInclude = _ => true)
+
+  def resolveTransform(code: String, input: String): String = {
+    try {
+      val fn = Eval.getCompiledCode[TransformFunc](code)
+
+      fn.apply(input)
+    } catch {
+      case e: Exception => e.toString
+    }
+  }
+}
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala
index 27b0c50..7451023 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala
@@ -20,8 +20,6 @@
 package org.apache.s2graph.graphql.types
 
 import org.apache.s2graph.graphql.repository.GraphRepository
-import org.apache.s2graph.graphql.types._
-
 
 /**
   * S2Graph GraphQL schema.
@@ -47,5 +45,13 @@
     fields(s2Type.mutationFields ++ mutateManagementFields: _*)
   )
 
-  val S2GraphSchema = Schema(S2QueryType, Option(S2MutationType))
+  val directives = S2Directive.Transform :: BuiltinDirectives
+
+  private val s2Schema = Schema(
+    S2QueryType,
+    Option(S2MutationType),
+    directives = directives
+  )
+
+  val S2GraphSchema = s2Schema
 }
diff --git a/s2graphql/src/test/scala/org/apache/s2graph/graphql/DirectiveTest.scala b/s2graphql/src/test/scala/org/apache/s2graph/graphql/DirectiveTest.scala
new file mode 100644
index 0000000..699a968
--- /dev/null
+++ b/s2graphql/src/test/scala/org/apache/s2graph/graphql/DirectiveTest.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.graphql
+
+import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.graphql.types.S2Directive
+import org.scalatest._
+
+class DirectiveTest extends FunSuite with Matchers with BeforeAndAfterAll {
+  var testGraph: TestGraph = _
+
+  override def beforeAll = {
+    val config = ConfigFactory.load()
+    testGraph = new EmptyGraph(config)
+    testGraph.open()
+  }
+
+  override def afterAll(): Unit = {
+    testGraph.cleanup()
+  }
+
+  test("transform") {
+    val input = "20170601_A0"
+    val code =
+      """ (s: String) => {
+          val date = s.split("_").head
+          s"http://abc.xy.com/IMG_${date}.png"
+      }
+
+      """.stripMargin
+    val actual = S2Directive.resolveTransform(code, input)
+    val expected = "http://abc.xy.com/IMG_20170601.png"
+
+    actual shouldBe expected
+  }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
index 717d087..8f21bc2 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
@@ -29,7 +29,12 @@
 
   def run() = {
     // source
-    jobDesc.sources.foreach{ source => dfMap.put(source.conf.name, source.toDF(ss))}
+    jobDesc.sources.foreach{ source =>
+      val df = source.toDF(ss)
+      if (source.conf.cache.getOrElse(false) && !df.isStreaming) df.cache()
+
+      dfMap.put(source.conf.name, df)
+    }
     logger.debug(s"valid source DF set : ${dfMap.keySet}")
 
     // process
@@ -64,7 +69,9 @@
     }
     .map { p =>
       val inputMap = p.conf.inputs.map{ input => (input,  dfMap(input)) }.toMap
-      p.conf.name -> p.execute(ss, inputMap)
+      val df = p.execute(ss, inputMap)
+      if (p.conf.cache.getOrElse(false) && !df.isStreaming) df.cache()
+      p.conf.name -> df
     }
   }
 }
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
index 6bc100f..c077f5b 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
@@ -20,6 +20,7 @@
 package org.apache.s2graph.s2jobs.task
 
 import org.apache.spark.sql.{DataFrame, SparkSession}
+import play.api.libs.json.Json
 
 /**
   * Process
@@ -47,7 +48,68 @@
     val sql = conf.options("sql")
     logger.debug(s"${LOG_PREFIX} sql : $sql")
 
-    ss.sql(sql)
+    postProcess(ss.sql(sql))
   }
+
+  /**
+    * extraOperations
+    * @param df
+    * @return
+    */
+  private def postProcess(df: DataFrame): DataFrame = {
+    import org.apache.spark.sql.functions._
+
+    var resultDF = df
+
+    // watermark
+    val timeColumn = conf.options.get(s"time.column")
+    logger.debug(s">> timeColumn:  ${timeColumn}")
+
+    val waterMarkDelayTime = conf.options.get(s"watermark.delay.time")
+    if (waterMarkDelayTime.isDefined){
+      logger.debug(s">> waterMarkDelayTime : ${waterMarkDelayTime}")
+      if (timeColumn.isDefined) {
+        resultDF = resultDF.withWatermark(timeColumn.get, waterMarkDelayTime.get)
+      } else logger.warn("time.column does not exists.. cannot apply watermark")
+    }
+
+    // drop duplication
+    val dropDuplicateColumns = conf.options.get("drop.duplicate.columns")
+    if (dropDuplicateColumns.isDefined) {
+      logger.debug(s">> dropDuplicates : ${dropDuplicateColumns}")
+      resultDF = resultDF.dropDuplicates(dropDuplicateColumns.get.split(","))
+    }
+
+    // groupBy
+    val groupedKeysOpt = conf.options.get(s"grouped.keys")
+    if (groupedKeysOpt.isDefined) {
+      var groupedKeys = groupedKeysOpt.get.split(",").map{ key =>
+        col(key.trim)
+      }.toSeq
+
+      val windowDurationOpt = conf.options.get(s"grouped.window.duration")
+      val slideDurationOpt = conf.options.get(s"grouped.slide.duration")
+      if (windowDurationOpt.isDefined && slideDurationOpt.isDefined){
+        logger.debug(s">> using window operation : Duration ${windowDurationOpt}, slideDuration : ${slideDurationOpt}")
+        groupedKeys = groupedKeys ++ Seq(window(col(timeColumn.get), windowDurationOpt.get, slideDurationOpt.get))
+      }
+      logger.debug(s">> groupedKeys: ${groupedKeys}")
+
+      // aggregate options
+      val aggExprs = Json.parse(conf.options.getOrElse(s"grouped.dataset.agg", "[\"count(1)\"]")).as[Seq[String]].map(expr(_))
+      logger.debug(s">> aggr : ${aggExprs}")
+
+      val groupedDF = resultDF.groupBy(groupedKeys: _*)
+
+      resultDF = if (aggExprs.size > 1) {
+        groupedDF.agg(aggExprs.head, aggExprs.tail: _*)
+      } else {
+        groupedDF.agg(aggExprs.head)
+      }
+    }
+
+    resultDF
+  }
+
 }
 
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index ae88b6d..720c2d7 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -69,6 +69,8 @@
     }
     val interval = conf.options.getOrElse("interval", DEFAULT_TRIGGER_INTERVAL)
     val checkpointLocation = conf.options.getOrElse("checkpointLocation", DEFAULT_CHECKPOINT_LOCATION)
+    val isContinuous = conf.options.getOrElse("isContinuous", "false").toBoolean
+    val trigger = if (isContinuous) Trigger.Continuous(interval) else Trigger.ProcessingTime(interval)
 
     val cfg = conf.options ++ Map("checkpointLocation" -> checkpointLocation)
 
@@ -78,7 +80,7 @@
       .queryName(s"${queryName}_${conf.name}")
       .format(FORMAT)
       .options(cfg)
-      .trigger(Trigger.ProcessingTime(interval))
+      .trigger(trigger)
       .outputMode(mode)
       .start()
   }
@@ -93,7 +95,7 @@
       case _ => SaveMode.Overwrite
     }
 
-    val partitionedWriter = if (partitionsOpt.isDefined) writer.partitionBy(partitionsOpt.get) else writer
+    val partitionedWriter = if (partitionsOpt.isDefined) writer.partitionBy(partitionsOpt.get.split(","): _*) else writer
 
     writeBatchInner(partitionedWriter.format(FORMAT).mode(mode))
   }
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index 9c80e58..259cfc0 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -40,19 +40,30 @@
   val DEFAULT_FORMAT = "raw"
   override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", "subscribe")
 
+  def repartition(df: DataFrame, defaultParallelism: Int) = {
+    conf.options.get("numPartitions").map(n => Integer.parseInt(n)) match {
+      case Some(numOfPartitions: Int) =>
+        logger.info(s"[repartitition] $numOfPartitions ($defaultParallelism)")
+        if (numOfPartitions >= defaultParallelism) df.repartition(numOfPartitions)
+        else df.coalesce(numOfPartitions)
+      case None => df
+    }
+  }
+
   override def toDF(ss:SparkSession):DataFrame = {
     logger.info(s"${LOG_PREFIX} options: ${conf.options}")
 
     val format = conf.options.getOrElse("format", "raw")
     val df = ss.readStream.format("kafka").options(conf.options).load()
 
+    val partitionedDF = repartition(df, df.sparkSession.sparkContext.defaultParallelism)
     format match {
-      case "raw" => df
-      case "json" => parseJsonSchema(ss, df)
+      case "raw" => partitionedDF
+      case "json" => parseJsonSchema(ss, partitionedDF)
 //      case "custom" => parseCustomSchema(df)
       case _ =>
         logger.warn(s"${LOG_PREFIX} unsupported format '$format'.. use default schema ")
-        df
+        partitionedDF
     }
   }
 
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index ea42828..1210132 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -42,8 +42,7 @@
     taskConf.options.filterKeys(_.startsWith("cache.")).mapValues(_.toInt)
   }
 }
-
-case class TaskConf(name: String, `type`: String, inputs: Seq[String] = Nil, options: Map[String, String] = Map.empty)
+case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty, cache:Option[Boolean]=None)
 
 trait Task extends Serializable with Logger {
   val conf: TaskConf