Merge branch 'master' into S2GRAPH-203

* master:
  [S2GRAPH-219] Added query that includes all vertices and associated edges for GraphVisualize.
  [S2GRAPH-218] add operations not supported on sql
  [S2GRAPH-216] Provide a transform directive in the GraphQL query result
  Extract Where and EdgeTransformer to TraversalHelper.
  make middleware for GraphFormatWriter
  add extension for SigmaJs
  add continuous trigger option
  add partition options for source df
  add dataframe cache option
  support multiple partitions
  add operations not supported on sql
  add simple test
  implement @trasform directive
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
index b7c0381..5f1c225 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
@@ -26,9 +26,9 @@
 import akka.http.scaladsl.server.Directives._
 import akka.http.scaladsl.server._
 import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.graphql.middleware.{GraphFormatted, Transform}
 import org.apache.s2graph.core.S2Graph
 import org.apache.s2graph.core.utils.SafeUpdateCache
-import org.apache.s2graph.graphql.middleware.{GraphFormatted}
 import org.apache.s2graph.graphql.repository.GraphRepository
 import org.apache.s2graph.graphql.types.SchemaDef
 import org.slf4j.LoggerFactory
@@ -36,12 +36,13 @@
 import sangria.execution._
 import sangria.execution.deferred.DeferredResolver
 import sangria.marshalling.sprayJson._
-import sangria.parser.QueryParser
+import sangria.parser.{QueryParser, SyntaxError}
 import sangria.schema.Schema
-import spray.json.{JsBoolean, JsObject, JsString}
+import spray.json._
 
 import scala.collection.JavaConverters._
 import scala.concurrent.{ExecutionContext, Future}
+import scala.util.control.NonFatal
 import scala.util.{Failure, Success, Try}
 
 object GraphQLServer {
@@ -64,7 +65,7 @@
 
   val schemaCache = new SafeUpdateCache(schemaConfig)
 
-  def updateEdgeFetcher(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = {
+  def updateEdgeFetcher(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Try[Unit] = {
     val ret = Try {
       val spray.json.JsObject(fields) = requestJSON
       val spray.json.JsString(labelName) = fields("label")
@@ -73,33 +74,7 @@
       s2graph.management.updateEdgeFetcher(labelName, jsOptions.compactPrint)
     }
 
-    ret match {
-      case Success(f) => complete(OK -> JsString("start"))
-      case Failure(e) => complete(InternalServerError -> spray.json.JsObject("message" -> JsString(e.toString)))
-    }
-  }
-
-  def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = {
-    val spray.json.JsObject(fields) = requestJSON
-    val spray.json.JsString(query) = fields("query")
-
-    val operation = fields.get("operationName") collect {
-      case spray.json.JsString(op) => op
-    }
-
-    val vars = fields.get("variables") match {
-      case Some(obj: spray.json.JsObject) => obj
-      case _ => spray.json.JsObject.empty
-    }
-
-    QueryParser.parse(query) match {
-      case Success(queryAst) =>
-        logger.info(queryAst.renderCompact)
-        complete(executeGraphQLQuery(queryAst, operation, vars))
-      case Failure(error) =>
-        logger.warn(error.getMessage, error)
-        complete(BadRequest -> spray.json.JsObject("error" -> JsString(error.getMessage)))
-    }
+    ret
   }
 
   /**
@@ -109,28 +84,43 @@
   logger.info(s"schemaCacheTTL: ${schemaCacheTTL}")
 
   private def createNewSchema(): Schema[GraphRepository, Any] = {
-    logger.info(s"Schema updated: ${System.currentTimeMillis()}")
     val newSchema = new SchemaDef(s2Repository).S2GraphSchema
-    logger.info("-" * 80)
+    logger.info(s"Schema updated: ${System.currentTimeMillis()}")
 
     newSchema
   }
 
+  def formatError(error: Throwable): JsValue = error match {
+    case syntaxError: SyntaxError ⇒
+      JsObject("errors" → JsArray(
+        JsObject(
+          "message" → JsString(syntaxError.getMessage),
+          "locations" → JsArray(JsObject(
+            "line" → JsNumber(syntaxError.originalError.position.line),
+            "column" → JsNumber(syntaxError.originalError.position.column))))))
+
+    case NonFatal(e) ⇒ formatError(e.toString)
+    case e ⇒ throw e
+  }
+
+  def formatError(message: String): JsObject =
+    JsObject("errors" → JsArray(JsObject("message" → JsString(message))))
+
+  def onEvictSchema(o: AnyRef): Unit = {
+    logger.info("Schema Evicted")
+  }
+
   val TransformMiddleWare = List(org.apache.s2graph.graphql.middleware.Transform())
 
-  private def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = {
-    val cacheKey = className + "s2Schema"
-    val s2schema = schemaCache.withCache(cacheKey, broadcast = false)(createNewSchema())
-
+  def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = {
     import GraphRepository._
+
+    val cacheKey = className + "s2Schema"
+    val s2schema = schemaCache.withCache(cacheKey, broadcast = false, onEvict = onEvictSchema)(createNewSchema())
     val resolver: DeferredResolver[GraphRepository] = DeferredResolver.fetchers(vertexFetcher, edgeFetcher)
 
     val includeGrpaph = vars.fields.get("includeGraph").contains(spray.json.JsBoolean(true))
-    val middleWares = if (includeGrpaph) {
-      GraphFormatted :: TransformMiddleWare
-    } else {
-      TransformMiddleWare
-    }
+    val middleWares = if (includeGrpaph) GraphFormatted :: TransformMiddleWare else TransformMiddleWare
 
     Executor.execute(
       s2schema,
@@ -140,15 +130,15 @@
       operationName = op,
       deferredResolver = resolver,
       middleware = middleWares
-    )
-      .map((res: spray.json.JsValue) => OK -> res)
+    ).map((res: spray.json.JsValue) => OK -> res)
       .recover {
         case error: QueryAnalysisError =>
-          logger.warn(error.getMessage, error)
+          logger.error("Error on execute", error)
           BadRequest -> error.resolveError
         case error: ErrorWithResolver =>
-          logger.error(error.getMessage, error)
+          logger.error("Error on execute", error)
           InternalServerError -> error.resolveError
       }
   }
 }
+
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
index 6f57cc4..755d185 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,18 +19,29 @@
 
 package org.apache.s2graph.graphql
 
+import java.nio.charset.Charset
+
 import akka.actor.ActorSystem
 import akka.http.scaladsl.Http
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
+import akka.http.scaladsl.model._
 import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server._
+import akka.http.scaladsl.server.{Route, StandardRoute}
 import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.Flow
 import org.slf4j.LoggerFactory
+import sangria.parser.QueryParser
+import spray.json._
 
 import scala.concurrent.Await
 import scala.language.postfixOps
+import scala.util._
+import akka.http.scaladsl.marshalling.{Marshaller, ToEntityMarshaller, ToResponseMarshallable}
+import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
+import akka.util.ByteString
+import sangria.ast.Document
+import sangria.renderer.{QueryRenderer, QueryRendererConfig}
+
+import scala.collection.immutable.Seq
 
 object Server extends App {
   val logger = LoggerFactory.getLogger(this.getClass)
@@ -39,20 +50,57 @@
   implicit val materializer = ActorMaterializer()
 
   import system.dispatcher
-
   import scala.concurrent.duration._
 
-  val route: Flow[HttpRequest, HttpResponse, Any] = (post & path("graphql")) {
-    entity(as[spray.json.JsValue])(GraphQLServer.endpoint)
-  } ~ (post & path("updateEdgeFetcher")) {
-    entity(as[spray.json.JsValue])(GraphQLServer.updateEdgeFetcher)
-  } ~ {
-    getFromResource("assets/graphiql.html")
-  }
+  import spray.json.DefaultJsonProtocol._
+
+  val route: Route =
+    get {
+      getFromResource("assets/graphiql.html")
+    } ~ (post & path("updateEdgeFetcher")) {
+      entity(as[JsValue]) { body =>
+        GraphQLServer.updateEdgeFetcher(body) match {
+          case Success(_) => complete(StatusCodes.OK -> JsString("Update fetcher finished"))
+          case Failure(e) =>
+            logger.error("Error on execute", e)
+            complete(StatusCodes.InternalServerError -> spray.json.JsObject("message" -> JsString(e.toString)))
+        }
+      }
+    } ~ (post & path("graphql")) {
+      parameters('operationName.?, 'variables.?) { (operationNameParam, variablesParam) =>
+        entity(as[Document]) { document ⇒
+          variablesParam.map(parseJson) match {
+            case None ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationNameParam, JsObject()))
+            case Some(Right(js)) ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationNameParam, js.asJsObject))
+            case Some(Left(e)) ⇒
+              logger.error("Error on execute", e)
+              complete(StatusCodes.BadRequest -> GraphQLServer.formatError(e))
+          }
+        } ~ entity(as[JsValue]) { body ⇒
+          val fields = body.asJsObject.fields
+
+          val query = fields.get("query").map(js => js.convertTo[String])
+          val operationName = fields.get("operationName").filterNot(_ == null).map(_.convertTo[String])
+          val variables = fields.get("variables").filterNot(_ == null)
+
+          query.map(QueryParser.parse(_)) match {
+            case None ⇒ complete(StatusCodes.BadRequest -> GraphQLServer.formatError("No query to execute"))
+            case Some(Failure(error)) ⇒
+              logger.error("Error on execute", error)
+              complete(StatusCodes.BadRequest -> GraphQLServer.formatError(error))
+            case Some(Success(document)) => variables match {
+              case Some(js) ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationName, js.asJsObject))
+              case None ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationName, JsObject()))
+            }
+          }
+        }
+      }
+    }
 
   val port = sys.props.get("http.port").fold(8000)(_.toInt)
 
-  logger.info(s"Starting GraphQL server... ${port}")
+  logger.info(s"Starting GraphQL server... $port")
+
   Http().bindAndHandle(route, "0.0.0.0", port)
 
   def shutdown(): Unit = {
@@ -63,4 +111,40 @@
 
     logger.info("Terminated.")
   }
+
+  // Unmarshaller
+
+  def unmarshallerContentTypes: Seq[ContentTypeRange] = mediaTypes.map(ContentTypeRange.apply)
+
+  def mediaTypes: Seq[MediaType.WithFixedCharset] =
+    Seq(MediaType.applicationWithFixedCharset("graphql", HttpCharsets.`UTF-8`, "graphql"))
+
+  implicit def documentMarshaller(implicit config: QueryRendererConfig = QueryRenderer.Compact): ToEntityMarshaller[Document] = {
+    Marshaller.oneOf(mediaTypes: _*) {
+      mediaType ⇒
+        Marshaller.withFixedContentType(ContentType(mediaType)) {
+          json ⇒ HttpEntity(mediaType, QueryRenderer.render(json, config))
+        }
+    }
+  }
+
+  implicit val documentUnmarshaller: FromEntityUnmarshaller[Document] = {
+    Unmarshaller.byteStringUnmarshaller
+      .forContentTypes(unmarshallerContentTypes: _*)
+      .map {
+        case ByteString.empty ⇒ throw Unmarshaller.NoContentException
+        case data ⇒
+          import sangria.parser.DeliveryScheme.Throw
+          QueryParser.parse(data.decodeString(Charset.forName("UTF-8")))
+      }
+  }
+
+  def parseJson(jsStr: String): Either[Throwable, JsValue] = {
+    val parsed = Try(jsStr.parseJson)
+    parsed match {
+      case Success(js) => Right(js)
+      case Failure(e) => Left(e)
+    }
+  }
+
 }