Merge branch 'master' into S2GRAPH-203
* master:
[S2GRAPH-219] Added query that includes all vertices and associated edges for GraphVisualize.
[S2GRAPH-218] add operations not supported on sql
[S2GRAPH-216] Provide a transform directive in the GraphQL query result
Extract Where and EdgeTransformer to TraversalHelper.
make middleware for GraphFormatWriter
add extension for SigmaJs
add continuous trigger option
add partition options for source df
add dataframe cache option
support multiple partitions
add operations not supported on sql
add simple test
implement @trasform directive
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
index b7c0381..5f1c225 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
@@ -26,9 +26,9 @@
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.graphql.middleware.{GraphFormatted, Transform}
import org.apache.s2graph.core.S2Graph
import org.apache.s2graph.core.utils.SafeUpdateCache
-import org.apache.s2graph.graphql.middleware.{GraphFormatted}
import org.apache.s2graph.graphql.repository.GraphRepository
import org.apache.s2graph.graphql.types.SchemaDef
import org.slf4j.LoggerFactory
@@ -36,12 +36,13 @@
import sangria.execution._
import sangria.execution.deferred.DeferredResolver
import sangria.marshalling.sprayJson._
-import sangria.parser.QueryParser
+import sangria.parser.{QueryParser, SyntaxError}
import sangria.schema.Schema
-import spray.json.{JsBoolean, JsObject, JsString}
+import spray.json._
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
+import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
object GraphQLServer {
@@ -64,7 +65,7 @@
val schemaCache = new SafeUpdateCache(schemaConfig)
- def updateEdgeFetcher(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = {
+ def updateEdgeFetcher(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Try[Unit] = {
val ret = Try {
val spray.json.JsObject(fields) = requestJSON
val spray.json.JsString(labelName) = fields("label")
@@ -73,33 +74,7 @@
s2graph.management.updateEdgeFetcher(labelName, jsOptions.compactPrint)
}
- ret match {
- case Success(f) => complete(OK -> JsString("start"))
- case Failure(e) => complete(InternalServerError -> spray.json.JsObject("message" -> JsString(e.toString)))
- }
- }
-
- def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = {
- val spray.json.JsObject(fields) = requestJSON
- val spray.json.JsString(query) = fields("query")
-
- val operation = fields.get("operationName") collect {
- case spray.json.JsString(op) => op
- }
-
- val vars = fields.get("variables") match {
- case Some(obj: spray.json.JsObject) => obj
- case _ => spray.json.JsObject.empty
- }
-
- QueryParser.parse(query) match {
- case Success(queryAst) =>
- logger.info(queryAst.renderCompact)
- complete(executeGraphQLQuery(queryAst, operation, vars))
- case Failure(error) =>
- logger.warn(error.getMessage, error)
- complete(BadRequest -> spray.json.JsObject("error" -> JsString(error.getMessage)))
- }
+ ret
}
/**
@@ -109,28 +84,43 @@
logger.info(s"schemaCacheTTL: ${schemaCacheTTL}")
private def createNewSchema(): Schema[GraphRepository, Any] = {
- logger.info(s"Schema updated: ${System.currentTimeMillis()}")
val newSchema = new SchemaDef(s2Repository).S2GraphSchema
- logger.info("-" * 80)
+ logger.info(s"Schema updated: ${System.currentTimeMillis()}")
newSchema
}
+ def formatError(error: Throwable): JsValue = error match {
+ case syntaxError: SyntaxError ⇒
+ JsObject("errors" → JsArray(
+ JsObject(
+ "message" → JsString(syntaxError.getMessage),
+ "locations" → JsArray(JsObject(
+ "line" → JsNumber(syntaxError.originalError.position.line),
+ "column" → JsNumber(syntaxError.originalError.position.column))))))
+
+ case NonFatal(e) ⇒ formatError(e.toString)
+ case e ⇒ throw e
+ }
+
+ def formatError(message: String): JsObject =
+ JsObject("errors" → JsArray(JsObject("message" → JsString(message))))
+
+ def onEvictSchema(o: AnyRef): Unit = {
+ logger.info("Schema Evicted")
+ }
+
val TransformMiddleWare = List(org.apache.s2graph.graphql.middleware.Transform())
- private def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = {
- val cacheKey = className + "s2Schema"
- val s2schema = schemaCache.withCache(cacheKey, broadcast = false)(createNewSchema())
-
+ def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = {
import GraphRepository._
+
+ val cacheKey = className + "s2Schema"
+ val s2schema = schemaCache.withCache(cacheKey, broadcast = false, onEvict = onEvictSchema)(createNewSchema())
val resolver: DeferredResolver[GraphRepository] = DeferredResolver.fetchers(vertexFetcher, edgeFetcher)
val includeGrpaph = vars.fields.get("includeGraph").contains(spray.json.JsBoolean(true))
- val middleWares = if (includeGrpaph) {
- GraphFormatted :: TransformMiddleWare
- } else {
- TransformMiddleWare
- }
+ val middleWares = if (includeGrpaph) GraphFormatted :: TransformMiddleWare else TransformMiddleWare
Executor.execute(
s2schema,
@@ -140,15 +130,15 @@
operationName = op,
deferredResolver = resolver,
middleware = middleWares
- )
- .map((res: spray.json.JsValue) => OK -> res)
+ ).map((res: spray.json.JsValue) => OK -> res)
.recover {
case error: QueryAnalysisError =>
- logger.warn(error.getMessage, error)
+ logger.error("Error on execute", error)
BadRequest -> error.resolveError
case error: ErrorWithResolver =>
- logger.error(error.getMessage, error)
+ logger.error("Error on execute", error)
InternalServerError -> error.resolveError
}
}
}
+
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
index 6f57cc4..755d185 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,18 +19,29 @@
package org.apache.s2graph.graphql
+import java.nio.charset.Charset
+
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
+import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server._
+import akka.http.scaladsl.server.{Route, StandardRoute}
import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.Flow
import org.slf4j.LoggerFactory
+import sangria.parser.QueryParser
+import spray.json._
import scala.concurrent.Await
import scala.language.postfixOps
+import scala.util._
+import akka.http.scaladsl.marshalling.{Marshaller, ToEntityMarshaller, ToResponseMarshallable}
+import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
+import akka.util.ByteString
+import sangria.ast.Document
+import sangria.renderer.{QueryRenderer, QueryRendererConfig}
+
+import scala.collection.immutable.Seq
object Server extends App {
val logger = LoggerFactory.getLogger(this.getClass)
@@ -39,20 +50,57 @@
implicit val materializer = ActorMaterializer()
import system.dispatcher
-
import scala.concurrent.duration._
- val route: Flow[HttpRequest, HttpResponse, Any] = (post & path("graphql")) {
- entity(as[spray.json.JsValue])(GraphQLServer.endpoint)
- } ~ (post & path("updateEdgeFetcher")) {
- entity(as[spray.json.JsValue])(GraphQLServer.updateEdgeFetcher)
- } ~ {
- getFromResource("assets/graphiql.html")
- }
+ import spray.json.DefaultJsonProtocol._
+
+ val route: Route =
+ get {
+ getFromResource("assets/graphiql.html")
+ } ~ (post & path("updateEdgeFetcher")) {
+ entity(as[JsValue]) { body =>
+ GraphQLServer.updateEdgeFetcher(body) match {
+ case Success(_) => complete(StatusCodes.OK -> JsString("Update fetcher finished"))
+ case Failure(e) =>
+ logger.error("Error on execute", e)
+ complete(StatusCodes.InternalServerError -> spray.json.JsObject("message" -> JsString(e.toString)))
+ }
+ }
+ } ~ (post & path("graphql")) {
+ parameters('operationName.?, 'variables.?) { (operationNameParam, variablesParam) =>
+ entity(as[Document]) { document ⇒
+ variablesParam.map(parseJson) match {
+ case None ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationNameParam, JsObject()))
+ case Some(Right(js)) ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationNameParam, js.asJsObject))
+ case Some(Left(e)) ⇒
+ logger.error("Error on execute", e)
+ complete(StatusCodes.BadRequest -> GraphQLServer.formatError(e))
+ }
+ } ~ entity(as[JsValue]) { body ⇒
+ val fields = body.asJsObject.fields
+
+ val query = fields.get("query").map(js => js.convertTo[String])
+ val operationName = fields.get("operationName").filterNot(_ == null).map(_.convertTo[String])
+ val variables = fields.get("variables").filterNot(_ == null)
+
+ query.map(QueryParser.parse(_)) match {
+ case None ⇒ complete(StatusCodes.BadRequest -> GraphQLServer.formatError("No query to execute"))
+ case Some(Failure(error)) ⇒
+ logger.error("Error on execute", error)
+ complete(StatusCodes.BadRequest -> GraphQLServer.formatError(error))
+ case Some(Success(document)) => variables match {
+ case Some(js) ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationName, js.asJsObject))
+ case None ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationName, JsObject()))
+ }
+ }
+ }
+ }
+ }
val port = sys.props.get("http.port").fold(8000)(_.toInt)
- logger.info(s"Starting GraphQL server... ${port}")
+ logger.info(s"Starting GraphQL server... $port")
+
Http().bindAndHandle(route, "0.0.0.0", port)
def shutdown(): Unit = {
@@ -63,4 +111,40 @@
logger.info("Terminated.")
}
+
+ // Unmarshaller
+
+ def unmarshallerContentTypes: Seq[ContentTypeRange] = mediaTypes.map(ContentTypeRange.apply)
+
+ def mediaTypes: Seq[MediaType.WithFixedCharset] =
+ Seq(MediaType.applicationWithFixedCharset("graphql", HttpCharsets.`UTF-8`, "graphql"))
+
+ implicit def documentMarshaller(implicit config: QueryRendererConfig = QueryRenderer.Compact): ToEntityMarshaller[Document] = {
+ Marshaller.oneOf(mediaTypes: _*) {
+ mediaType ⇒
+ Marshaller.withFixedContentType(ContentType(mediaType)) {
+ json ⇒ HttpEntity(mediaType, QueryRenderer.render(json, config))
+ }
+ }
+ }
+
+ implicit val documentUnmarshaller: FromEntityUnmarshaller[Document] = {
+ Unmarshaller.byteStringUnmarshaller
+ .forContentTypes(unmarshallerContentTypes: _*)
+ .map {
+ case ByteString.empty ⇒ throw Unmarshaller.NoContentException
+ case data ⇒
+ import sangria.parser.DeliveryScheme.Throw
+ QueryParser.parse(data.decodeString(Charset.forName("UTF-8")))
+ }
+ }
+
+ def parseJson(jsStr: String): Either[Throwable, JsValue] = {
+ val parsed = Try(jsStr.parseJson)
+ parsed match {
+ case Success(js) => Right(js)
+ case Failure(e) => Left(e)
+ }
+ }
+
}