run apache-rat on S2GRAPH-219
diff --git a/CHANGES b/CHANGES
index b9bf81b..a68ecef 100644
--- a/CHANGES
+++ b/CHANGES
@@ -37,6 +37,7 @@
     * [S2GRAPH-197] - Provide S2graphSink for non-streaming dataset
     * [S2GRAPH-205] - too many initialize S2Graph when writeBatchMutate on S2GraphSink
     * [S2GRAPH-201] - Provide S2GraphSource
+    * [S2GRAPH-218] - add operations not supported on sql
 
 ** Bug
     * [S2GRAPH-159] - Wrong syntax at a bash script under Linux
@@ -55,6 +56,7 @@
     * [S2GRAPH-192] - could not find service column when creating the label
     * [S2GRAPH-195] - could not create indices using S2GraphQL
     * [S2GRAPH-196] - Apply Query Parameter to Label Fetch in S2GraphQL
+    * [S2GRAPH-220] - Filter clause is not working on AnnoyModelFetcher
 
 ** Improvement
     * [S2GRAPH-72] - Support Apache TinkerPop and Gremlin
@@ -78,6 +80,7 @@
     * [S2GRAPH-210] - Rename package `mysqls` to `schema`
     * [S2GRAPH-213] - Abstract Query/Mutation from Storage.
     * [S2GRAPH-214] - Add REAME for movielens examples
+    * [S2GRAPH-216] - Provide a transform directive in the GraphQL query result.
 
 ** New Feature
     * [S2GRAPH-123] - Support different index on out/in direction.
@@ -88,8 +91,10 @@
     * [S2GRAPH-177] - Add support for createServiceColumn/addVertex APIs on GraphQL.
     * [S2GRAPH-185] - Support Spark Structured Streaming to work with data in streaming and batch.
     * [S2GRAPH-183] - Provide batch job to dump data stored in HBase into file.
+    * [S2GRAPH-203] - Support "application/graphql" Content-Type header.
     * [S2GRAPH-206] - Generalize machine learning model serving.
-    * [S2GRAPH-215] - Implement a Storage Backend for JDBC driver, such as H2, MySql using the Mutator and Fetcher interfaces
+    * [S2GRAPH-215] - Implement a Storage Backend for JDBC driver, such as H2, MySql using the Mutator and Fetcher interfaces.
+    * [S2GRAPH-219] - Added query that includes all vertices and associated edges for GraphVisualize.
 
 ** Task
     * [S2GRAPH-162] - Update year in the NOTICE file.
diff --git a/build.sbt b/build.sbt
index 84d17d5..de7c02f 100755
--- a/build.sbt
+++ b/build.sbt
@@ -25,9 +25,9 @@
 
 lazy val commonSettings = Seq(
   organization := "org.apache.s2graph",
-  scalaVersion := "2.11.7",
+  scalaVersion := "2.11.8",
   isSnapshot := version.value.endsWith("-SNAPSHOT"),
-  scalacOptions := Seq("-language:postfixOps", "-unchecked", "-deprecation", "-feature", "-Xlint", "-Xlint:-missing-interpolator"),
+  scalacOptions := Seq("-language:postfixOps", "-unchecked", "-deprecation", "-feature", "-Xlint", "-Xlint:-missing-interpolator", "-target:jvm-1.8"),
   javaOptions ++= collection.JavaConversions.propertiesAsScalaMap(System.getProperties).map { case (key, value) => "-D" + key + "=" + value }.toSeq,
   testOptions in Test += Tests.Argument("-oDF"),
   concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
diff --git a/s2core/build.sbt b/s2core/build.sbt
index cfc32d6..229bf41 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -47,7 +47,7 @@
   "org.apache.tinkerpop" % "gremlin-test" % tinkerpopVersion % "test",
   "org.scalatest" %% "scalatest" % "2.2.4" % "test",
   "org.specs2" %% "specs2-core" % specs2Version % "test",
-  "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion ,
+  "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion,
   "org.apache.lucene" % "lucene-core" % "6.6.0",
   "org.apache.lucene" % "lucene-queryparser" % "6.6.0",
   "org.rocksdb" % "rocksdbjni" % rocksVersion,
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index 0a4a49b..ba18e8d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -22,6 +22,7 @@
 import java.util
 
 import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}
+import org.apache.s2graph.core.parsers.WhereParser
 import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.types.{HBaseType, InnerVal, LabelWithDirection, VertexId}
 import org.apache.s2graph.core.utils.{Extensions, logger}
@@ -39,11 +40,11 @@
     else randomInt(sampleNumber, range, set + Random.nextInt(range))
   }
 
-  def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = {
+  def sample(edges: Seq[EdgeWithScore], offset: Int, n: Int): Seq[EdgeWithScore] = {
     if (edges.size <= n) {
       edges
     } else {
-      val plainEdges = if (queryRequest.queryParam.offset == 0) {
+      val plainEdges = if (offset == 0) {
         edges.tail
       } else edges
 
@@ -141,6 +142,41 @@
 
     (hashKey, filterHashKey)
   }
+
+  def edgeToEdgeWithScore(queryRequest: QueryRequest,
+                          edge: S2EdgeLike,
+                          parentEdges: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+    val prevScore = queryRequest.prevStepScore
+    val queryParam = queryRequest.queryParam
+    val queryOption = queryRequest.query.queryOption
+    val nextStepOpt = queryRequest.nextStepOpt
+    val labelWeight = queryRequest.labelWeight
+    val where = queryParam.where.get
+    val isDefaultTransformer = queryParam.edgeTransformer.isDefault
+
+    if (where != WhereParser.success && !where.filter(edge)) Nil
+    else {
+      val edges = if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
+      edges.map { e =>
+        if (!queryOption.ignorePrevStepCache) {
+          EdgeWithScore(e, queryParam.rank.score(edge), queryParam.label)
+        } else {
+          val edgeScore = queryParam.rank.score(edge)
+          val score = queryParam.scorePropagateOp match {
+            case "plus" => edgeScore + prevScore
+            case "divide" =>
+              if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
+              else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
+            case _ => edgeScore * prevScore
+          }
+          val tsVal = processTimeDecay(queryParam, edge)
+
+          EdgeWithScore(e.copyParentEdges(parentEdges), score = score * labelWeight * tsVal, label = queryParam.label)
+        }
+      }
+    }
+  }
+
 }
 
 
@@ -481,7 +517,7 @@
       val degreeScore = 0.0
 
       val sampled =
-        if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
+        if (queryRequest.queryParam.sample >= 0) sample(edgeWithScores, queryParam.offset, queryParam.sample)
         else edgeWithScores
 
       val withScores = for {
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
index cdf3b71..6c00ca0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
@@ -29,74 +29,23 @@
 
 object AnnoyModelFetcher {
   val IndexFilePathKey = "annoyIndexFilePath"
-//  val DictFilePathKey = "annoyDictFilePath"
   val DimensionKey = "annoyIndexDimension"
   val IndexTypeKey = "annoyIndexType"
 
-  //  def loadDictFromLocal(file: File): Map[Int, String] = {
-  //    val files = if (file.isDirectory) {
-  //      file.listFiles()
-  //    } else {
-  //      Array(file)
-  //    }
-  //
-  //    files.flatMap { file =>
-  //      Source.fromFile(file).getLines().zipWithIndex.flatMap { case (line, _idx) =>
-  //        val tokens = line.stripMargin.split(",")
-  //        try {
-  //          val tpl = if (tokens.length < 2) {
-  //            (tokens.head.toInt, tokens.head)
-  //          } else {
-  //            (tokens.head.toInt, tokens.tail.head)
-  //          }
-  //          Seq(tpl)
-  //        } catch {
-  //          case e: Exception => Nil
-  //        }
-  //      }
-  //    }.toMap
-  //  }
-
   def buildAnnoy4s[T](indexPath: String)(implicit converter: KeyConverter[T]): Annoy[T] = {
     Annoy.load[T](indexPath)
   }
-
-  //  def buildIndex(indexPath: String,
-  //                 dictPath: String,
-  //                 dimension: Int,
-  //                 indexType: IndexType): ANNIndexWithDict = {
-  //    val dict = loadDictFromLocal(new File(dictPath))
-  //    val index = new ANNIndex(dimension, indexPath, indexType)
-  //
-  //    ANNIndexWithDict(index, dict)
-  //  }
-  //
-  //  def buildIndex(config: Config): ANNIndexWithDict = {
-  //    val indexPath = config.getString(IndexFilePathKey)
-  //    val dictPath = config.getString(DictFilePathKey)
-  //
-  //    val dimension = config.getInt(DimensionKey)
-  //    val indexType = Try { config.getString(IndexTypeKey) }.toOption.map(IndexType.valueOf).getOrElse(IndexType.ANGULAR)
-  //
-  //    buildIndex(indexPath, dictPath, dimension, indexType)
-  //  }
 }
 
-//
-//case class ANNIndexWithDict(index: ANNIndex, dict: Map[Int, String]) {
-//  val dictRev = dict.map(kv => kv._2 -> kv._1)
-//}
-
 class AnnoyModelFetcher(val graph: S2GraphLike) extends EdgeFetcher {
   import AnnoyModelFetcher._
+  import TraversalHelper._
 
   val builder = graph.elementBuilder
 
-  //  var model: ANNIndexWithDict = _
   var model: Annoy[String] = _
 
   override def init(config: Config)(implicit ec: ExecutionContext): Unit = {
-
     model = AnnoyModelFetcher.buildAnnoy4s(config.getString(IndexFilePathKey))
   }
 
@@ -106,15 +55,17 @@
     val stepResultLs = queryRequests.map { queryRequest =>
       val vertex = queryRequest.vertex
       val queryParam = queryRequest.queryParam
+      val shouldBuildParents = queryRequest.query.queryOption.returnTree || queryParam.whereHasParent
+      val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
 
-      val edgeWithScores = model.query(vertex.innerId.toIdString(), queryParam.limit).getOrElse(Nil).map { case (tgtId, score) =>
+      val edgeWithScores = model.query(vertex.innerId.toIdString(), queryParam.limit).getOrElse(Nil).flatMap { case (tgtId, score) =>
         val tgtVertexId = builder.newVertexId(queryParam.label.service,
           queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), tgtId)
 
         val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty
         val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props)
 
-        EdgeWithScore(edge, score, queryParam.label)
+        edgeToEdgeWithScore(queryRequest, edge, parentEdges)
       }
 
       StepResult(edgeWithScores, Nil, Nil)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
index 433b4dc..a07ed07 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
@@ -28,6 +28,7 @@
 
 
 class FastTextFetcher(val graph: S2GraphLike) extends EdgeFetcher {
+  import org.apache.s2graph.core.TraversalHelper._
   val builder = graph.elementBuilder
   var fastText: FastText = _
 
@@ -49,16 +50,19 @@
     val stepResultLs = queryRequests.map { queryRequest =>
       val vertex = queryRequest.vertex
       val queryParam = queryRequest.queryParam
+      val shouldBuildParents = queryRequest.query.queryOption.returnTree || queryParam.whereHasParent
+      val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
+
       val line = fastText.getLine(vertex.innerId.toIdString())
 
-      val edgeWithScores = fastText.predict(line, queryParam.limit).map { case (_label, score) =>
+      val edgeWithScores = fastText.predict(line, queryParam.limit).flatMap { case (_label, score) =>
         val tgtVertexId = builder.newVertexId(queryParam.label.service,
           queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), _label)
 
         val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty
         val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props)
 
-        EdgeWithScore(edge, score, queryParam.label)
+        edgeToEdgeWithScore(queryRequest, edge, parentEdges)
       }
 
       StepResult(edgeWithScores, Nil, Nil)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala
index aa05a31..83fab98 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala
@@ -63,6 +63,7 @@
   import InceptionFetcher._
 
   import scala.collection.JavaConverters._
+  import org.apache.s2graph.core.TraversalHelper._
   val builder = graph.elementBuilder
 
   var graphDef: Array[Byte] = _
@@ -81,16 +82,19 @@
     val stepResultLs = queryRequests.map { queryRequest =>
       val vertex = queryRequest.vertex
       val queryParam = queryRequest.queryParam
+      val shouldBuildParents = queryRequest.query.queryOption.returnTree || queryParam.whereHasParent
+      val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
+
       val urlText = vertex.innerId.toIdString()
 
-      val edgeWithScores = predict(graphDef, labels)(getImageBytes(urlText), queryParam.limit).map { case (label, score) =>
+      val edgeWithScores = predict(graphDef, labels)(getImageBytes(urlText), queryParam.limit).flatMap { case (label, score) =>
         val tgtVertexId = builder.newVertexId(queryParam.label.service,
           queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), label)
 
         val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty
         val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props)
 
-        EdgeWithScore(edge, score, queryParam.label)
+        edgeToEdgeWithScore(queryRequest, edge, parentEdges)
       }
 
       StepResult(edgeWithScores, Nil, Nil)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
index 2b02bdd..9abd80d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
@@ -27,6 +27,8 @@
 import org.apache.s2graph.core.utils.logger
 
 class StorageIO(val graph: S2GraphLike, val serDe: StorageSerDe) {
+  import TraversalHelper._
+
   val dummyCursor: Array[Byte] = Array.empty
 
   /** Parsing Logic: parse from kv from Storage into Edge */
@@ -86,17 +88,15 @@
                                startOffset: Int = 0,
                                len: Int = Int.MaxValue): StepResult = {
 
+
     val toSKeyValue = implicitly[CanSKeyValue[K]].toSKeyValue _
 
     if (kvs.isEmpty) StepResult.Empty.copy(cursors = Seq(dummyCursor))
     else {
       val queryOption = queryRequest.query.queryOption
       val queryParam = queryRequest.queryParam
-      val labelWeight = queryRequest.labelWeight
-      val nextStepOpt = queryRequest.nextStepOpt
-      val where = queryParam.where.get
       val label = queryParam.label
-      val isDefaultTransformer = queryParam.edgeTransformer.isDefault
+
       val first = kvs.head
       val kv = first
       val schemaVer = queryParam.label.schemaVersion
@@ -114,41 +114,19 @@
 
       val lastCursor: Seq[Array[Byte]] = Seq(if (keyValues.nonEmpty) toSKeyValue(keyValues(keyValues.length - 1)).row else dummyCursor)
 
+      val edgeWithScores = for {
+        (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len
+        edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq
+        edgeWithScore <- edgeToEdgeWithScore(queryRequest, edge, parentEdges)
+      } yield {
+        edgeWithScore
+      }
+
       if (!queryOption.ignorePrevStepCache) {
-        val edgeWithScores = for {
-          (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len
-          edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq
-          if where == WhereParser.success || where.filter(edge)
-          convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
-        } yield {
-          val score = queryParam.rank.score(edge)
-          EdgeWithScore(convertedEdge, score, label)
-        }
         StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor)
       } else {
-        val degreeScore = 0.0
-
-        val edgeWithScores = for {
-          (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len
-          edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq
-          if where == WhereParser.success || where.filter(edge)
-          convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
-        } yield {
-          val edgeScore = queryParam.rank.score(edge)
-          val score = queryParam.scorePropagateOp match {
-            case "plus" => edgeScore + prevScore
-            case "divide" =>
-              if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
-              else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
-            case _ => edgeScore * prevScore
-          }
-          val tsVal = processTimeDecay(queryParam, edge)
-          val newScore = degreeScore + score
-          EdgeWithScore(convertedEdge.copyParentEdges(parentEdges), score = newScore * labelWeight * tsVal, label = label)
-        }
-
         val sampled =
-          if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
+          if (queryRequest.queryParam.sample >= 0) sample(edgeWithScores, queryParam.offset, queryParam.sample)
           else edgeWithScores
 
         val normalized = if (queryParam.shouldNormalize) normalize(sampled) else sampled
diff --git a/s2graphql/build.sbt b/s2graphql/build.sbt
index a3fbec0..bc71f98 100644
--- a/s2graphql/build.sbt
+++ b/s2graphql/build.sbt
@@ -26,13 +26,15 @@
 scalacOptions ++= Seq("-deprecation", "-feature")
 
 libraryDependencies ++= Seq(
+  "org.scala-lang" % "scala-compiler" % scalaVersion.value,
+  "org.scala-lang" % "scala-reflect" % scalaVersion.value,
+
   "org.sangria-graphql" %% "sangria" % "1.4.0",
   "org.sangria-graphql" %% "sangria-spray-json" % "1.0.0",
   "org.sangria-graphql" %% "sangria-play-json" % "1.0.1" % Test,
 
   "com.typesafe.akka" %% "akka-http" % "10.0.10",
   "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.10",
-
   "com.typesafe.akka" %% "akka-slf4j" % "2.4.6",
 
   "org.scalatest" %% "scalatest" % "3.0.4" % Test
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
index eee7c93..5f1c225 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
@@ -26,6 +26,7 @@
 import akka.http.scaladsl.server.Directives._
 import akka.http.scaladsl.server._
 import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.graphql.middleware.{GraphFormatted, Transform}
 import org.apache.s2graph.core.S2Graph
 import org.apache.s2graph.core.utils.SafeUpdateCache
 import org.apache.s2graph.graphql.repository.GraphRepository
@@ -35,12 +36,13 @@
 import sangria.execution._
 import sangria.execution.deferred.DeferredResolver
 import sangria.marshalling.sprayJson._
-import sangria.parser.QueryParser
+import sangria.parser.{QueryParser, SyntaxError}
 import sangria.schema.Schema
-import spray.json.{JsBoolean, JsObject, JsString}
+import spray.json._
 
 import scala.collection.JavaConverters._
 import scala.concurrent.{ExecutionContext, Future}
+import scala.util.control.NonFatal
 import scala.util.{Failure, Success, Try}
 
 object GraphQLServer {
@@ -63,7 +65,7 @@
 
   val schemaCache = new SafeUpdateCache(schemaConfig)
 
-  def updateEdgeFetcher(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = {
+  def updateEdgeFetcher(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Try[Unit] = {
     val ret = Try {
       val spray.json.JsObject(fields) = requestJSON
       val spray.json.JsString(labelName) = fields("label")
@@ -72,33 +74,7 @@
       s2graph.management.updateEdgeFetcher(labelName, jsOptions.compactPrint)
     }
 
-    ret match {
-      case Success(f) => complete(OK -> JsString("start"))
-      case Failure(e) => complete(InternalServerError -> spray.json.JsObject("message" -> JsString(e.toString)))
-    }
-  }
-
-  def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = {
-    val spray.json.JsObject(fields) = requestJSON
-    val spray.json.JsString(query) = fields("query")
-
-    val operation = fields.get("operationName") collect {
-      case spray.json.JsString(op) => op
-    }
-
-    val vars = fields.get("variables") match {
-      case Some(obj: spray.json.JsObject) => obj
-      case _ => spray.json.JsObject.empty
-    }
-
-    QueryParser.parse(query) match {
-      case Success(queryAst) =>
-        logger.info(queryAst.renderCompact)
-        complete(executeGraphQLQuery(queryAst, operation, vars))
-      case Failure(error) =>
-        logger.warn(error.getMessage, error)
-        complete(BadRequest -> spray.json.JsObject("error" -> JsString(error.getMessage)))
-    }
+    ret
   }
 
   /**
@@ -108,35 +84,61 @@
   logger.info(s"schemaCacheTTL: ${schemaCacheTTL}")
 
   private def createNewSchema(): Schema[GraphRepository, Any] = {
-    logger.info(s"Schema updated: ${System.currentTimeMillis()}")
     val newSchema = new SchemaDef(s2Repository).S2GraphSchema
-    logger.info("-" * 80)
+    logger.info(s"Schema updated: ${System.currentTimeMillis()}")
 
     newSchema
   }
 
-  private def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = {
-    val cacheKey = className + "s2Schema"
-    val s2schema = schemaCache.withCache(cacheKey, broadcast = false)(createNewSchema())
+  def formatError(error: Throwable): JsValue = error match {
+    case syntaxError: SyntaxError ⇒
+      JsObject("errors" → JsArray(
+        JsObject(
+          "message" → JsString(syntaxError.getMessage),
+          "locations" → JsArray(JsObject(
+            "line" → JsNumber(syntaxError.originalError.position.line),
+            "column" → JsNumber(syntaxError.originalError.position.column))))))
+
+    case NonFatal(e) ⇒ formatError(e.toString)
+    case e ⇒ throw e
+  }
+
+  def formatError(message: String): JsObject =
+    JsObject("errors" → JsArray(JsObject("message" → JsString(message))))
+
+  def onEvictSchema(o: AnyRef): Unit = {
+    logger.info("Schema Evicted")
+  }
+
+  val TransformMiddleWare = List(org.apache.s2graph.graphql.middleware.Transform())
+
+  def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = {
     import GraphRepository._
+
+    val cacheKey = className + "s2Schema"
+    val s2schema = schemaCache.withCache(cacheKey, broadcast = false, onEvict = onEvictSchema)(createNewSchema())
     val resolver: DeferredResolver[GraphRepository] = DeferredResolver.fetchers(vertexFetcher, edgeFetcher)
 
+    val includeGrpaph = vars.fields.get("includeGraph").contains(spray.json.JsBoolean(true))
+    val middleWares = if (includeGrpaph) GraphFormatted :: TransformMiddleWare else TransformMiddleWare
+
     Executor.execute(
       s2schema,
       query,
       s2Repository,
       variables = vars,
       operationName = op,
-      deferredResolver = resolver
-    )
-      .map((res: spray.json.JsValue) => OK -> res)
+      deferredResolver = resolver,
+      middleware = middleWares
+    ).map((res: spray.json.JsValue) => OK -> res)
       .recover {
         case error: QueryAnalysisError =>
-          logger.warn(error.getMessage, error)
+          logger.error("Error on execute", error)
           BadRequest -> error.resolveError
         case error: ErrorWithResolver =>
-          logger.error(error.getMessage, error)
+          logger.error("Error on execute", error)
           InternalServerError -> error.resolveError
       }
   }
 }
+
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
index 6f57cc4..755d185 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,18 +19,29 @@
 
 package org.apache.s2graph.graphql
 
+import java.nio.charset.Charset
+
 import akka.actor.ActorSystem
 import akka.http.scaladsl.Http
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
+import akka.http.scaladsl.model._
 import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server._
+import akka.http.scaladsl.server.{Route, StandardRoute}
 import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.Flow
 import org.slf4j.LoggerFactory
+import sangria.parser.QueryParser
+import spray.json._
 
 import scala.concurrent.Await
 import scala.language.postfixOps
+import scala.util._
+import akka.http.scaladsl.marshalling.{Marshaller, ToEntityMarshaller, ToResponseMarshallable}
+import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
+import akka.util.ByteString
+import sangria.ast.Document
+import sangria.renderer.{QueryRenderer, QueryRendererConfig}
+
+import scala.collection.immutable.Seq
 
 object Server extends App {
   val logger = LoggerFactory.getLogger(this.getClass)
@@ -39,20 +50,57 @@
   implicit val materializer = ActorMaterializer()
 
   import system.dispatcher
-
   import scala.concurrent.duration._
 
-  val route: Flow[HttpRequest, HttpResponse, Any] = (post & path("graphql")) {
-    entity(as[spray.json.JsValue])(GraphQLServer.endpoint)
-  } ~ (post & path("updateEdgeFetcher")) {
-    entity(as[spray.json.JsValue])(GraphQLServer.updateEdgeFetcher)
-  } ~ {
-    getFromResource("assets/graphiql.html")
-  }
+  import spray.json.DefaultJsonProtocol._
+
+  val route: Route =
+    get {
+      getFromResource("assets/graphiql.html")
+    } ~ (post & path("updateEdgeFetcher")) {
+      entity(as[JsValue]) { body =>
+        GraphQLServer.updateEdgeFetcher(body) match {
+          case Success(_) => complete(StatusCodes.OK -> JsString("Update fetcher finished"))
+          case Failure(e) =>
+            logger.error("Error on execute", e)
+            complete(StatusCodes.InternalServerError -> spray.json.JsObject("message" -> JsString(e.toString)))
+        }
+      }
+    } ~ (post & path("graphql")) {
+      parameters('operationName.?, 'variables.?) { (operationNameParam, variablesParam) =>
+        entity(as[Document]) { document ⇒
+          variablesParam.map(parseJson) match {
+            case None ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationNameParam, JsObject()))
+            case Some(Right(js)) ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationNameParam, js.asJsObject))
+            case Some(Left(e)) ⇒
+              logger.error("Error on execute", e)
+              complete(StatusCodes.BadRequest -> GraphQLServer.formatError(e))
+          }
+        } ~ entity(as[JsValue]) { body ⇒
+          val fields = body.asJsObject.fields
+
+          val query = fields.get("query").map(js => js.convertTo[String])
+          val operationName = fields.get("operationName").filterNot(_ == null).map(_.convertTo[String])
+          val variables = fields.get("variables").filterNot(_ == null)
+
+          query.map(QueryParser.parse(_)) match {
+            case None ⇒ complete(StatusCodes.BadRequest -> GraphQLServer.formatError("No query to execute"))
+            case Some(Failure(error)) ⇒
+              logger.error("Error on execute", error)
+              complete(StatusCodes.BadRequest -> GraphQLServer.formatError(error))
+            case Some(Success(document)) => variables match {
+              case Some(js) ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationName, js.asJsObject))
+              case None ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationName, JsObject()))
+            }
+          }
+        }
+      }
+    }
 
   val port = sys.props.get("http.port").fold(8000)(_.toInt)
 
-  logger.info(s"Starting GraphQL server... ${port}")
+  logger.info(s"Starting GraphQL server... $port")
+
   Http().bindAndHandle(route, "0.0.0.0", port)
 
   def shutdown(): Unit = {
@@ -63,4 +111,40 @@
 
     logger.info("Terminated.")
   }
+
+  // Unmarshaller
+
+  def unmarshallerContentTypes: Seq[ContentTypeRange] = mediaTypes.map(ContentTypeRange.apply)
+
+  def mediaTypes: Seq[MediaType.WithFixedCharset] =
+    Seq(MediaType.applicationWithFixedCharset("graphql", HttpCharsets.`UTF-8`, "graphql"))
+
+  implicit def documentMarshaller(implicit config: QueryRendererConfig = QueryRenderer.Compact): ToEntityMarshaller[Document] = {
+    Marshaller.oneOf(mediaTypes: _*) {
+      mediaType ⇒
+        Marshaller.withFixedContentType(ContentType(mediaType)) {
+          json ⇒ HttpEntity(mediaType, QueryRenderer.render(json, config))
+        }
+    }
+  }
+
+  implicit val documentUnmarshaller: FromEntityUnmarshaller[Document] = {
+    Unmarshaller.byteStringUnmarshaller
+      .forContentTypes(unmarshallerContentTypes: _*)
+      .map {
+        case ByteString.empty ⇒ throw Unmarshaller.NoContentException
+        case data ⇒
+          import sangria.parser.DeliveryScheme.Throw
+          QueryParser.parse(data.decodeString(Charset.forName("UTF-8")))
+      }
+  }
+
+  def parseJson(jsStr: String): Either[Throwable, JsValue] = {
+    val parsed = Try(jsStr.parseJson)
+    parsed match {
+      case Success(js) => Right(js)
+      case Failure(e) => Left(e)
+    }
+  }
+
 }
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/GraphFormatWriter.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/GraphFormatWriter.scala
new file mode 100644
index 0000000..6dcc368
--- /dev/null
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/GraphFormatWriter.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.graphql.middleware
+
+import org.apache.s2graph.core.schema.ServiceColumn
+import org.apache.s2graph.core.{GraphElement, S2EdgeLike, S2VertexLike}
+import org.apache.s2graph.graphql.types.PlayJsonScalarType
+import org.slf4s.LoggerFactory
+import play.api.libs.json._
+import sangria.execution._
+import sangria.schema.Context
+
+
+object GraphFormatted extends Middleware[Any] with MiddlewareAfterField[Any] with MiddlewareExtension[Any] {
+  implicit val logger = LoggerFactory.getLogger(this.getClass)
+
+  type QueryVal = java.util.concurrent.ConcurrentHashMap[GraphElement, Unit]
+
+  type FieldVal = Long
+
+
+  def beforeQuery(context: MiddlewareQueryContext[Any, _, _]) = {
+    new java.util.concurrent.ConcurrentHashMap[GraphElement, Unit]()
+  }
+
+  def afterQuery(queryVal: QueryVal, context: MiddlewareQueryContext[Any, _, _]) = ()
+
+  def toVertexId(v: S2VertexLike, c: ServiceColumn): String = {
+    val innerId = v.innerId.toIdString()
+
+    s"${c.service.serviceName}.${c.columnName}.${innerId}"
+  }
+
+  def toVertexJson(v: S2VertexLike, c: ServiceColumn): JsValue = {
+    Json.obj(
+      "id" -> toVertexId(v, c),
+      "label" -> v.innerId.toIdString()
+    )
+  }
+
+  def toEdgeJson(e: S2EdgeLike): JsValue = {
+    Json.obj(
+      "source" -> toVertexId(e.srcVertex, e.innerLabel.srcColumn),
+      "target" -> toVertexId(e.tgtVertex, e.innerLabel.tgtColumn),
+      "id" -> s"${toVertexId(e.srcVertex, e.innerLabel.srcColumn)}.${e.label()}.${toVertexId(e.tgtVertex, e.innerLabel.tgtColumn)}",
+      "label" -> e.label()
+    )
+  }
+
+  def afterQueryExtensions(queryVal: QueryVal,
+                           context: MiddlewareQueryContext[Any, _, _]
+                          ): Vector[Extension[_]] = {
+
+    import scala.collection.JavaConverters._
+    val elements = queryVal.keys().asScala.toVector
+
+    val edges = elements.collect { case e: S2EdgeLike => e }
+    val vertices = elements.collect { case v: S2VertexLike => v -> v.serviceColumn }
+    val verticesFromEdges = edges.flatMap { e =>
+      val label = e.innerLabel
+      Vector((e.srcVertex, label.srcColumn), (e.tgtVertex, label.srcColumn))
+    }
+
+    val verticesJson = (vertices ++ verticesFromEdges).map { case (v, c) => toVertexJson(v, c) }.distinct
+    val edgeJson = edges.map(toEdgeJson).distinct
+
+    val jsElements = Json.obj(
+      "nodes" -> verticesJson,
+      "edges" -> edgeJson
+    )
+
+    val graph = Json.obj("graph" -> jsElements)
+
+    /**
+      * nodes: [{id, label, x, y, size}, ..],
+      * edges: [{id, source, target, label}]
+      */
+    implicit val iu = PlayJsonScalarType.PlayJsonInputUnmarshaller
+    Vector(Extension[JsValue](graph))
+  }
+
+  def beforeField(queryVal: QueryVal, mctx: MiddlewareQueryContext[Any, _, _], ctx: Context[Any, _]) = {
+    continue(System.currentTimeMillis())
+  }
+
+  def afterField(queryVal: QueryVal, fieldVal: FieldVal, value: Any, mctx: MiddlewareQueryContext[Any, _, _], ctx: Context[Any, _]) = {
+    //    logger.info(s"${ctx.parentType.name}.${ctx.field.name} = ${value.getClass.getName}")
+
+    value match {
+      case ls: Seq[_] => ls.foreach {
+        case e: GraphElement => queryVal.put(e, ())
+        case _ =>
+      }
+      case e: GraphElement => queryVal.put(e, ())
+      case _ =>
+    }
+
+    None
+  }
+}
+
+
+
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/Transform.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/Transform.scala
new file mode 100644
index 0000000..9aa86fb
--- /dev/null
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/Transform.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.graphql.middleware
+
+import org.apache.s2graph.graphql.types.S2Directive
+import sangria.ast.StringValue
+import sangria.execution.{Middleware, MiddlewareAfterField, MiddlewareQueryContext}
+import sangria.schema.Context
+
+case class Transform() extends Middleware[Any] with MiddlewareAfterField[Any] {
+  type QueryVal = Unit
+  type FieldVal = Unit
+
+  def beforeQuery(context: MiddlewareQueryContext[Any, _, _]) = ()
+
+  def afterQuery(queryVal: QueryVal, context: MiddlewareQueryContext[Any, _, _]) = ()
+
+  def beforeField(cache: QueryVal, mctx: MiddlewareQueryContext[Any, _, _], ctx: Context[Any, _]) = continue
+
+  def afterField(cache: QueryVal, fromCache: FieldVal, value: Any, mctx: MiddlewareQueryContext[Any, _, _], ctx: Context[Any, _]) = {
+
+    value match {
+      case s: String => {
+        val transformFuncOpt = ctx.astFields.headOption.flatMap(_.directives.find(_.name == S2Directive.Transform.name)).map { directive =>
+          directive.arguments.head.value.asInstanceOf[StringValue].value
+        }
+
+        transformFuncOpt.map(funcString => S2Directive.resolveTransform(funcString, s)).orElse(Option(s))
+      }
+
+      case _ ⇒ None
+    }
+  }
+}
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
index 0b5a2e9..b5e65dc 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
@@ -123,9 +123,8 @@
   }
 
   def getVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = {
-    graph.asInstanceOf[S2Graph].searchVertices(queryParam).map { a =>
-      println(a)
-      a
+    graph.asInstanceOf[S2Graph].searchVertices(queryParam).map { v =>
+      v
     }
   }
 
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
index 478517f..a2b80da 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
@@ -28,7 +28,6 @@
 import sangria.schema._
 
 object FieldResolver {
-
   def graphElement[A](name: String, cType: String, c: Context[GraphRepository, Any]): A = {
     c.value match {
       case v: S2VertexLike => name match {
@@ -36,6 +35,7 @@
         case _ =>
           val innerVal = v.propertyValue(name).get
           JSONParser.innerValToAny(innerVal, cType).asInstanceOf[A]
+
       }
       case e: S2EdgeLike => name match {
         case "timestamp" => e.ts.asInstanceOf[A]
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/PlayJsonScalarType.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/PlayJsonScalarType.scala
new file mode 100644
index 0000000..fc7f1ce
--- /dev/null
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/PlayJsonScalarType.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.graphql.types
+
+import play.api.libs.json._
+import sangria.ast
+import sangria.execution.Executor
+import sangria.marshalling.{ArrayMapBuilder, InputUnmarshaller, ResultMarshaller, ScalarValueInfo}
+import sangria.schema._
+import sangria.validation.{BigIntCoercionViolation, IntCoercionViolation, ValueCoercionViolation}
+import sangria.macros._
+
+import scala.concurrent.ExecutionContext.Implicits.global
+
+object PlayJsonScalarType {
+
+  implicit object CustomPlayJsonResultMarshaller extends ResultMarshaller {
+    type Node = JsValue
+    type MapBuilder = ArrayMapBuilder[Node]
+
+    def emptyMapNode(keys: Seq[String]) = new ArrayMapBuilder[Node](keys)
+
+    def addMapNodeElem(builder: MapBuilder, key: String, value: Node, optional: Boolean) = builder.add(key, value)
+
+    def mapNode(builder: MapBuilder) = JsObject(builder.toMap)
+
+    def mapNode(keyValues: Seq[(String, JsValue)]) = Json.toJson(keyValues.toMap)
+
+    def arrayNode(values: Vector[JsValue]) = JsArray(values)
+
+    def optionalArrayNodeValue(value: Option[JsValue]) = value match {
+      case Some(v) ⇒ v
+      case None ⇒ nullNode
+    }
+
+    def scalarNode(value: Any, typeName: String, info: Set[ScalarValueInfo]) = value match {
+      case v: String ⇒ JsString(v)
+      case v: Boolean ⇒ JsBoolean(v)
+      case v: Int ⇒ JsNumber(v)
+      case v: Long ⇒ JsNumber(v)
+      case v: Float ⇒ JsNumber(BigDecimal(v))
+      case v: Double ⇒ JsNumber(v)
+      case v: BigInt ⇒ JsNumber(BigDecimal(v))
+      case v: BigDecimal ⇒ JsNumber(v)
+      case v: JsValue ⇒ v
+      case v ⇒ throw new IllegalArgumentException("Unsupported scalar value: " + v)
+    }
+
+    def enumNode(value: String, typeName: String) = JsString(value)
+
+    def nullNode = JsNull
+
+    def renderCompact(node: JsValue) = Json.stringify(node)
+
+    def renderPretty(node: JsValue) = Json.prettyPrint(node)
+  }
+
+  implicit object PlayJsonInputUnmarshaller extends InputUnmarshaller[JsValue] {
+    def getRootMapValue(node: JsValue, key: String) = node.asInstanceOf[JsObject].value get key
+
+    def isListNode(node: JsValue) = node.isInstanceOf[JsArray]
+
+    def getListValue(node: JsValue) = node.asInstanceOf[JsArray].value
+
+    def isMapNode(node: JsValue) = node.isInstanceOf[JsObject]
+
+    def getMapValue(node: JsValue, key: String) = node.asInstanceOf[JsObject].value get key
+
+    def getMapKeys(node: JsValue) = node.asInstanceOf[JsObject].fields.map(_._1)
+
+    def isDefined(node: JsValue) = node != JsNull
+
+    def getScalarValue(node: JsValue) = node match {
+      case JsBoolean(b) ⇒ b
+      case JsNumber(d) ⇒ d.toBigIntExact getOrElse d
+      case JsString(s) ⇒ s
+      case n ⇒ n
+    }
+
+    def getScalaScalarValue(node: JsValue) = getScalarValue(node)
+
+    def isEnumNode(node: JsValue) = node.isInstanceOf[JsString]
+
+    def isScalarNode(node: JsValue) = true
+
+    def isVariableNode(node: JsValue) = false
+
+    def getVariableName(node: JsValue) = throw new IllegalArgumentException("variables are not supported")
+
+    def render(node: JsValue) = Json.stringify(node)
+  }
+
+  case object JsonCoercionViolation extends ValueCoercionViolation("Not valid JSON")
+
+  implicit val JsonType = ScalarType[JsValue]("Json",
+    description = Some("Raw PlayJson value"),
+    coerceOutput = (value, _) ⇒ value,
+    coerceUserInput = {
+      case v: String ⇒ Right(JsString(v))
+      case v: Boolean ⇒ Right(JsBoolean(v))
+      case v: Int ⇒ Right(JsNumber(v))
+      case v: Long ⇒ Right(JsNumber(v))
+      case v: Float ⇒ Right(JsNumber(BigDecimal(v)))
+      case v: Double ⇒ Right(JsNumber(v))
+      case v: BigInt ⇒ Right(JsNumber(BigDecimal(v)))
+      case v: BigDecimal ⇒ Right(JsNumber(v))
+      case v: JsValue ⇒ Right(v)
+    },
+    coerceInput = {
+      case sv: ast.StringValue => Right(Json.parse(sv.value))
+      case _ ⇒
+        Left(JsonCoercionViolation)
+    })
+}
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Directive.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Directive.scala
new file mode 100644
index 0000000..54512c1
--- /dev/null
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Directive.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.graphql.types
+
+import sangria.schema.{Argument, Directive, DirectiveLocation, StringType}
+
+object S2Directive {
+
+  object Eval {
+
+    import scala.collection._
+
+    val codeMap = mutable.Map.empty[String, () => Any]
+
+    def compileCode(code: String): () => Any = {
+      import scala.tools.reflect.ToolBox
+      val toolbox = reflect.runtime.currentMirror.mkToolBox()
+
+      toolbox.compile(toolbox.parse(code))
+    }
+
+    def getCompiledCode[T](code: String): T = {
+      val compiledCode = Eval.codeMap.getOrElseUpdate(code, Eval.compileCode(code))
+
+      compiledCode
+        .asInstanceOf[() => Any]
+        .apply()
+        .asInstanceOf[T]
+    }
+  }
+
+  type TransformFunc = String => String
+
+  val funcArg = Argument("func", StringType)
+
+  val Transform =
+    Directive("transform",
+      arguments = List(funcArg),
+      locations = Set(DirectiveLocation.Field),
+      shouldInclude = _ => true)
+
+  def resolveTransform(code: String, input: String): String = {
+    try {
+      val fn = Eval.getCompiledCode[TransformFunc](code)
+
+      fn.apply(input)
+    } catch {
+      case e: Exception => e.toString
+    }
+  }
+}
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala
index 27b0c50..7451023 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala
@@ -20,8 +20,6 @@
 package org.apache.s2graph.graphql.types
 
 import org.apache.s2graph.graphql.repository.GraphRepository
-import org.apache.s2graph.graphql.types._
-
 
 /**
   * S2Graph GraphQL schema.
@@ -47,5 +45,13 @@
     fields(s2Type.mutationFields ++ mutateManagementFields: _*)
   )
 
-  val S2GraphSchema = Schema(S2QueryType, Option(S2MutationType))
+  val directives = S2Directive.Transform :: BuiltinDirectives
+
+  private val s2Schema = Schema(
+    S2QueryType,
+    Option(S2MutationType),
+    directives = directives
+  )
+
+  val S2GraphSchema = s2Schema
 }
diff --git a/s2graphql/src/test/scala/org/apache/s2graph/graphql/DirectiveTest.scala b/s2graphql/src/test/scala/org/apache/s2graph/graphql/DirectiveTest.scala
new file mode 100644
index 0000000..699a968
--- /dev/null
+++ b/s2graphql/src/test/scala/org/apache/s2graph/graphql/DirectiveTest.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.graphql
+
+import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.graphql.types.S2Directive
+import org.scalatest._
+
+class DirectiveTest extends FunSuite with Matchers with BeforeAndAfterAll {
+  var testGraph: TestGraph = _
+
+  override def beforeAll = {
+    val config = ConfigFactory.load()
+    testGraph = new EmptyGraph(config)
+    testGraph.open()
+  }
+
+  override def afterAll(): Unit = {
+    testGraph.cleanup()
+  }
+
+  test("transform") {
+    val input = "20170601_A0"
+    val code =
+      """ (s: String) => {
+          val date = s.split("_").head
+          s"http://abc.xy.com/IMG_${date}.png"
+      }
+
+      """.stripMargin
+    val actual = S2Directive.resolveTransform(code, input)
+    val expected = "http://abc.xy.com/IMG_20170601.png"
+
+    actual shouldBe expected
+  }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
index 717d087..8f21bc2 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
@@ -29,7 +29,12 @@
 
   def run() = {
     // source
-    jobDesc.sources.foreach{ source => dfMap.put(source.conf.name, source.toDF(ss))}
+    jobDesc.sources.foreach{ source =>
+      val df = source.toDF(ss)
+      if (source.conf.cache.getOrElse(false) && !df.isStreaming) df.cache()
+
+      dfMap.put(source.conf.name, df)
+    }
     logger.debug(s"valid source DF set : ${dfMap.keySet}")
 
     // process
@@ -64,7 +69,9 @@
     }
     .map { p =>
       val inputMap = p.conf.inputs.map{ input => (input,  dfMap(input)) }.toMap
-      p.conf.name -> p.execute(ss, inputMap)
+      val df = p.execute(ss, inputMap)
+      if (p.conf.cache.getOrElse(false) && !df.isStreaming) df.cache()
+      p.conf.name -> df
     }
   }
 }
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
index 6bc100f..c077f5b 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
@@ -20,6 +20,7 @@
 package org.apache.s2graph.s2jobs.task
 
 import org.apache.spark.sql.{DataFrame, SparkSession}
+import play.api.libs.json.Json
 
 /**
   * Process
@@ -47,7 +48,68 @@
     val sql = conf.options("sql")
     logger.debug(s"${LOG_PREFIX} sql : $sql")
 
-    ss.sql(sql)
+    postProcess(ss.sql(sql))
   }
+
+  /**
+    * extraOperations
+    * @param df
+    * @return
+    */
+  private def postProcess(df: DataFrame): DataFrame = {
+    import org.apache.spark.sql.functions._
+
+    var resultDF = df
+
+    // watermark
+    val timeColumn = conf.options.get(s"time.column")
+    logger.debug(s">> timeColumn:  ${timeColumn}")
+
+    val waterMarkDelayTime = conf.options.get(s"watermark.delay.time")
+    if (waterMarkDelayTime.isDefined){
+      logger.debug(s">> waterMarkDelayTime : ${waterMarkDelayTime}")
+      if (timeColumn.isDefined) {
+        resultDF = resultDF.withWatermark(timeColumn.get, waterMarkDelayTime.get)
+      } else logger.warn("time.column does not exists.. cannot apply watermark")
+    }
+
+    // drop duplication
+    val dropDuplicateColumns = conf.options.get("drop.duplicate.columns")
+    if (dropDuplicateColumns.isDefined) {
+      logger.debug(s">> dropDuplicates : ${dropDuplicateColumns}")
+      resultDF = resultDF.dropDuplicates(dropDuplicateColumns.get.split(","))
+    }
+
+    // groupBy
+    val groupedKeysOpt = conf.options.get(s"grouped.keys")
+    if (groupedKeysOpt.isDefined) {
+      var groupedKeys = groupedKeysOpt.get.split(",").map{ key =>
+        col(key.trim)
+      }.toSeq
+
+      val windowDurationOpt = conf.options.get(s"grouped.window.duration")
+      val slideDurationOpt = conf.options.get(s"grouped.slide.duration")
+      if (windowDurationOpt.isDefined && slideDurationOpt.isDefined){
+        logger.debug(s">> using window operation : Duration ${windowDurationOpt}, slideDuration : ${slideDurationOpt}")
+        groupedKeys = groupedKeys ++ Seq(window(col(timeColumn.get), windowDurationOpt.get, slideDurationOpt.get))
+      }
+      logger.debug(s">> groupedKeys: ${groupedKeys}")
+
+      // aggregate options
+      val aggExprs = Json.parse(conf.options.getOrElse(s"grouped.dataset.agg", "[\"count(1)\"]")).as[Seq[String]].map(expr(_))
+      logger.debug(s">> aggr : ${aggExprs}")
+
+      val groupedDF = resultDF.groupBy(groupedKeys: _*)
+
+      resultDF = if (aggExprs.size > 1) {
+        groupedDF.agg(aggExprs.head, aggExprs.tail: _*)
+      } else {
+        groupedDF.agg(aggExprs.head)
+      }
+    }
+
+    resultDF
+  }
+
 }
 
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index ae88b6d..720c2d7 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -69,6 +69,8 @@
     }
     val interval = conf.options.getOrElse("interval", DEFAULT_TRIGGER_INTERVAL)
     val checkpointLocation = conf.options.getOrElse("checkpointLocation", DEFAULT_CHECKPOINT_LOCATION)
+    val isContinuous = conf.options.getOrElse("isContinuous", "false").toBoolean
+    val trigger = if (isContinuous) Trigger.Continuous(interval) else Trigger.ProcessingTime(interval)
 
     val cfg = conf.options ++ Map("checkpointLocation" -> checkpointLocation)
 
@@ -78,7 +80,7 @@
       .queryName(s"${queryName}_${conf.name}")
       .format(FORMAT)
       .options(cfg)
-      .trigger(Trigger.ProcessingTime(interval))
+      .trigger(trigger)
       .outputMode(mode)
       .start()
   }
@@ -93,7 +95,7 @@
       case _ => SaveMode.Overwrite
     }
 
-    val partitionedWriter = if (partitionsOpt.isDefined) writer.partitionBy(partitionsOpt.get) else writer
+    val partitionedWriter = if (partitionsOpt.isDefined) writer.partitionBy(partitionsOpt.get.split(","): _*) else writer
 
     writeBatchInner(partitionedWriter.format(FORMAT).mode(mode))
   }
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index 9c80e58..259cfc0 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -40,19 +40,30 @@
   val DEFAULT_FORMAT = "raw"
   override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", "subscribe")
 
+  def repartition(df: DataFrame, defaultParallelism: Int) = {
+    conf.options.get("numPartitions").map(n => Integer.parseInt(n)) match {
+      case Some(numOfPartitions: Int) =>
+        logger.info(s"[repartitition] $numOfPartitions ($defaultParallelism)")
+        if (numOfPartitions >= defaultParallelism) df.repartition(numOfPartitions)
+        else df.coalesce(numOfPartitions)
+      case None => df
+    }
+  }
+
   override def toDF(ss:SparkSession):DataFrame = {
     logger.info(s"${LOG_PREFIX} options: ${conf.options}")
 
     val format = conf.options.getOrElse("format", "raw")
     val df = ss.readStream.format("kafka").options(conf.options).load()
 
+    val partitionedDF = repartition(df, df.sparkSession.sparkContext.defaultParallelism)
     format match {
-      case "raw" => df
-      case "json" => parseJsonSchema(ss, df)
+      case "raw" => partitionedDF
+      case "json" => parseJsonSchema(ss, partitionedDF)
 //      case "custom" => parseCustomSchema(df)
       case _ =>
         logger.warn(s"${LOG_PREFIX} unsupported format '$format'.. use default schema ")
-        df
+        partitionedDF
     }
   }
 
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index ea42828..1210132 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -42,8 +42,7 @@
     taskConf.options.filterKeys(_.startsWith("cache.")).mapValues(_.toInt)
   }
 }
-
-case class TaskConf(name: String, `type`: String, inputs: Seq[String] = Nil, options: Map[String, String] = Map.empty)
+case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty, cache:Option[Boolean]=None)
 
 trait Task extends Serializable with Logger {
   val conf: TaskConf