run apache-rat on S2GRAPH-219
diff --git a/CHANGES b/CHANGES
index b9bf81b..a68ecef 100644
--- a/CHANGES
+++ b/CHANGES
@@ -37,6 +37,7 @@
* [S2GRAPH-197] - Provide S2graphSink for non-streaming dataset
* [S2GRAPH-205] - too many initialize S2Graph when writeBatchMutate on S2GraphSink
* [S2GRAPH-201] - Provide S2GraphSource
+ * [S2GRAPH-218] - add operations not supported on sql
** Bug
* [S2GRAPH-159] - Wrong syntax at a bash script under Linux
@@ -55,6 +56,7 @@
* [S2GRAPH-192] - could not find service column when creating the label
* [S2GRAPH-195] - could not create indices using S2GraphQL
* [S2GRAPH-196] - Apply Query Parameter to Label Fetch in S2GraphQL
+ * [S2GRAPH-220] - Filter clause is not working on AnnoyModelFetcher
** Improvement
* [S2GRAPH-72] - Support Apache TinkerPop and Gremlin
@@ -78,6 +80,7 @@
* [S2GRAPH-210] - Rename package `mysqls` to `schema`
* [S2GRAPH-213] - Abstract Query/Mutation from Storage.
* [S2GRAPH-214] - Add REAME for movielens examples
+ * [S2GRAPH-216] - Provide a transform directive in the GraphQL query result.
** New Feature
* [S2GRAPH-123] - Support different index on out/in direction.
@@ -88,8 +91,10 @@
* [S2GRAPH-177] - Add support for createServiceColumn/addVertex APIs on GraphQL.
* [S2GRAPH-185] - Support Spark Structured Streaming to work with data in streaming and batch.
* [S2GRAPH-183] - Provide batch job to dump data stored in HBase into file.
+ * [S2GRAPH-203] - Support "application/graphql" Content-Type header.
* [S2GRAPH-206] - Generalize machine learning model serving.
- * [S2GRAPH-215] - Implement a Storage Backend for JDBC driver, such as H2, MySql using the Mutator and Fetcher interfaces
+ * [S2GRAPH-215] - Implement a Storage Backend for JDBC driver, such as H2, MySql using the Mutator and Fetcher interfaces.
+ * [S2GRAPH-219] - Added query that includes all vertices and associated edges for GraphVisualize.
** Task
* [S2GRAPH-162] - Update year in the NOTICE file.
diff --git a/build.sbt b/build.sbt
index 84d17d5..de7c02f 100755
--- a/build.sbt
+++ b/build.sbt
@@ -25,9 +25,9 @@
lazy val commonSettings = Seq(
organization := "org.apache.s2graph",
- scalaVersion := "2.11.7",
+ scalaVersion := "2.11.8",
isSnapshot := version.value.endsWith("-SNAPSHOT"),
- scalacOptions := Seq("-language:postfixOps", "-unchecked", "-deprecation", "-feature", "-Xlint", "-Xlint:-missing-interpolator"),
+ scalacOptions := Seq("-language:postfixOps", "-unchecked", "-deprecation", "-feature", "-Xlint", "-Xlint:-missing-interpolator", "-target:jvm-1.8"),
javaOptions ++= collection.JavaConversions.propertiesAsScalaMap(System.getProperties).map { case (key, value) => "-D" + key + "=" + value }.toSeq,
testOptions in Test += Tests.Argument("-oDF"),
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
diff --git a/s2core/build.sbt b/s2core/build.sbt
index cfc32d6..229bf41 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -47,7 +47,7 @@
"org.apache.tinkerpop" % "gremlin-test" % tinkerpopVersion % "test",
"org.scalatest" %% "scalatest" % "2.2.4" % "test",
"org.specs2" %% "specs2-core" % specs2Version % "test",
- "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion ,
+ "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion,
"org.apache.lucene" % "lucene-core" % "6.6.0",
"org.apache.lucene" % "lucene-queryparser" % "6.6.0",
"org.rocksdb" % "rocksdbjni" % rocksVersion,
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index 0a4a49b..ba18e8d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -22,6 +22,7 @@
import java.util
import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}
+import org.apache.s2graph.core.parsers.WhereParser
import org.apache.s2graph.core.schema.LabelMeta
import org.apache.s2graph.core.types.{HBaseType, InnerVal, LabelWithDirection, VertexId}
import org.apache.s2graph.core.utils.{Extensions, logger}
@@ -39,11 +40,11 @@
else randomInt(sampleNumber, range, set + Random.nextInt(range))
}
- def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = {
+ def sample(edges: Seq[EdgeWithScore], offset: Int, n: Int): Seq[EdgeWithScore] = {
if (edges.size <= n) {
edges
} else {
- val plainEdges = if (queryRequest.queryParam.offset == 0) {
+ val plainEdges = if (offset == 0) {
edges.tail
} else edges
@@ -141,6 +142,41 @@
(hashKey, filterHashKey)
}
+
+ def edgeToEdgeWithScore(queryRequest: QueryRequest,
+ edge: S2EdgeLike,
+ parentEdges: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+ val prevScore = queryRequest.prevStepScore
+ val queryParam = queryRequest.queryParam
+ val queryOption = queryRequest.query.queryOption
+ val nextStepOpt = queryRequest.nextStepOpt
+ val labelWeight = queryRequest.labelWeight
+ val where = queryParam.where.get
+ val isDefaultTransformer = queryParam.edgeTransformer.isDefault
+
+ if (where != WhereParser.success && !where.filter(edge)) Nil
+ else {
+ val edges = if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
+ edges.map { e =>
+ if (!queryOption.ignorePrevStepCache) {
+ EdgeWithScore(e, queryParam.rank.score(edge), queryParam.label)
+ } else {
+ val edgeScore = queryParam.rank.score(edge)
+ val score = queryParam.scorePropagateOp match {
+ case "plus" => edgeScore + prevScore
+ case "divide" =>
+ if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
+ else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
+ case _ => edgeScore * prevScore
+ }
+ val tsVal = processTimeDecay(queryParam, edge)
+
+ EdgeWithScore(e.copyParentEdges(parentEdges), score = score * labelWeight * tsVal, label = queryParam.label)
+ }
+ }
+ }
+ }
+
}
@@ -481,7 +517,7 @@
val degreeScore = 0.0
val sampled =
- if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
+ if (queryRequest.queryParam.sample >= 0) sample(edgeWithScores, queryParam.offset, queryParam.sample)
else edgeWithScores
val withScores = for {
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
index cdf3b71..6c00ca0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
@@ -29,74 +29,23 @@
object AnnoyModelFetcher {
val IndexFilePathKey = "annoyIndexFilePath"
-// val DictFilePathKey = "annoyDictFilePath"
val DimensionKey = "annoyIndexDimension"
val IndexTypeKey = "annoyIndexType"
- // def loadDictFromLocal(file: File): Map[Int, String] = {
- // val files = if (file.isDirectory) {
- // file.listFiles()
- // } else {
- // Array(file)
- // }
- //
- // files.flatMap { file =>
- // Source.fromFile(file).getLines().zipWithIndex.flatMap { case (line, _idx) =>
- // val tokens = line.stripMargin.split(",")
- // try {
- // val tpl = if (tokens.length < 2) {
- // (tokens.head.toInt, tokens.head)
- // } else {
- // (tokens.head.toInt, tokens.tail.head)
- // }
- // Seq(tpl)
- // } catch {
- // case e: Exception => Nil
- // }
- // }
- // }.toMap
- // }
-
def buildAnnoy4s[T](indexPath: String)(implicit converter: KeyConverter[T]): Annoy[T] = {
Annoy.load[T](indexPath)
}
-
- // def buildIndex(indexPath: String,
- // dictPath: String,
- // dimension: Int,
- // indexType: IndexType): ANNIndexWithDict = {
- // val dict = loadDictFromLocal(new File(dictPath))
- // val index = new ANNIndex(dimension, indexPath, indexType)
- //
- // ANNIndexWithDict(index, dict)
- // }
- //
- // def buildIndex(config: Config): ANNIndexWithDict = {
- // val indexPath = config.getString(IndexFilePathKey)
- // val dictPath = config.getString(DictFilePathKey)
- //
- // val dimension = config.getInt(DimensionKey)
- // val indexType = Try { config.getString(IndexTypeKey) }.toOption.map(IndexType.valueOf).getOrElse(IndexType.ANGULAR)
- //
- // buildIndex(indexPath, dictPath, dimension, indexType)
- // }
}
-//
-//case class ANNIndexWithDict(index: ANNIndex, dict: Map[Int, String]) {
-// val dictRev = dict.map(kv => kv._2 -> kv._1)
-//}
-
class AnnoyModelFetcher(val graph: S2GraphLike) extends EdgeFetcher {
import AnnoyModelFetcher._
+ import TraversalHelper._
val builder = graph.elementBuilder
- // var model: ANNIndexWithDict = _
var model: Annoy[String] = _
override def init(config: Config)(implicit ec: ExecutionContext): Unit = {
-
model = AnnoyModelFetcher.buildAnnoy4s(config.getString(IndexFilePathKey))
}
@@ -106,15 +55,17 @@
val stepResultLs = queryRequests.map { queryRequest =>
val vertex = queryRequest.vertex
val queryParam = queryRequest.queryParam
+ val shouldBuildParents = queryRequest.query.queryOption.returnTree || queryParam.whereHasParent
+ val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
- val edgeWithScores = model.query(vertex.innerId.toIdString(), queryParam.limit).getOrElse(Nil).map { case (tgtId, score) =>
+ val edgeWithScores = model.query(vertex.innerId.toIdString(), queryParam.limit).getOrElse(Nil).flatMap { case (tgtId, score) =>
val tgtVertexId = builder.newVertexId(queryParam.label.service,
queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), tgtId)
val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty
val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props)
- EdgeWithScore(edge, score, queryParam.label)
+ edgeToEdgeWithScore(queryRequest, edge, parentEdges)
}
StepResult(edgeWithScores, Nil, Nil)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
index 433b4dc..a07ed07 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
@@ -28,6 +28,7 @@
class FastTextFetcher(val graph: S2GraphLike) extends EdgeFetcher {
+ import org.apache.s2graph.core.TraversalHelper._
val builder = graph.elementBuilder
var fastText: FastText = _
@@ -49,16 +50,19 @@
val stepResultLs = queryRequests.map { queryRequest =>
val vertex = queryRequest.vertex
val queryParam = queryRequest.queryParam
+ val shouldBuildParents = queryRequest.query.queryOption.returnTree || queryParam.whereHasParent
+ val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
+
val line = fastText.getLine(vertex.innerId.toIdString())
- val edgeWithScores = fastText.predict(line, queryParam.limit).map { case (_label, score) =>
+ val edgeWithScores = fastText.predict(line, queryParam.limit).flatMap { case (_label, score) =>
val tgtVertexId = builder.newVertexId(queryParam.label.service,
queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), _label)
val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty
val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props)
- EdgeWithScore(edge, score, queryParam.label)
+ edgeToEdgeWithScore(queryRequest, edge, parentEdges)
}
StepResult(edgeWithScores, Nil, Nil)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala
index aa05a31..83fab98 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala
@@ -63,6 +63,7 @@
import InceptionFetcher._
import scala.collection.JavaConverters._
+ import org.apache.s2graph.core.TraversalHelper._
val builder = graph.elementBuilder
var graphDef: Array[Byte] = _
@@ -81,16 +82,19 @@
val stepResultLs = queryRequests.map { queryRequest =>
val vertex = queryRequest.vertex
val queryParam = queryRequest.queryParam
+ val shouldBuildParents = queryRequest.query.queryOption.returnTree || queryParam.whereHasParent
+ val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
+
val urlText = vertex.innerId.toIdString()
- val edgeWithScores = predict(graphDef, labels)(getImageBytes(urlText), queryParam.limit).map { case (label, score) =>
+ val edgeWithScores = predict(graphDef, labels)(getImageBytes(urlText), queryParam.limit).flatMap { case (label, score) =>
val tgtVertexId = builder.newVertexId(queryParam.label.service,
queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), label)
val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty
val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props)
- EdgeWithScore(edge, score, queryParam.label)
+ edgeToEdgeWithScore(queryRequest, edge, parentEdges)
}
StepResult(edgeWithScores, Nil, Nil)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
index 2b02bdd..9abd80d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
@@ -27,6 +27,8 @@
import org.apache.s2graph.core.utils.logger
class StorageIO(val graph: S2GraphLike, val serDe: StorageSerDe) {
+ import TraversalHelper._
+
val dummyCursor: Array[Byte] = Array.empty
/** Parsing Logic: parse from kv from Storage into Edge */
@@ -86,17 +88,15 @@
startOffset: Int = 0,
len: Int = Int.MaxValue): StepResult = {
+
val toSKeyValue = implicitly[CanSKeyValue[K]].toSKeyValue _
if (kvs.isEmpty) StepResult.Empty.copy(cursors = Seq(dummyCursor))
else {
val queryOption = queryRequest.query.queryOption
val queryParam = queryRequest.queryParam
- val labelWeight = queryRequest.labelWeight
- val nextStepOpt = queryRequest.nextStepOpt
- val where = queryParam.where.get
val label = queryParam.label
- val isDefaultTransformer = queryParam.edgeTransformer.isDefault
+
val first = kvs.head
val kv = first
val schemaVer = queryParam.label.schemaVersion
@@ -114,41 +114,19 @@
val lastCursor: Seq[Array[Byte]] = Seq(if (keyValues.nonEmpty) toSKeyValue(keyValues(keyValues.length - 1)).row else dummyCursor)
+ val edgeWithScores = for {
+ (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len
+ edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq
+ edgeWithScore <- edgeToEdgeWithScore(queryRequest, edge, parentEdges)
+ } yield {
+ edgeWithScore
+ }
+
if (!queryOption.ignorePrevStepCache) {
- val edgeWithScores = for {
- (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len
- edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq
- if where == WhereParser.success || where.filter(edge)
- convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
- } yield {
- val score = queryParam.rank.score(edge)
- EdgeWithScore(convertedEdge, score, label)
- }
StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor)
} else {
- val degreeScore = 0.0
-
- val edgeWithScores = for {
- (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len
- edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq
- if where == WhereParser.success || where.filter(edge)
- convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
- } yield {
- val edgeScore = queryParam.rank.score(edge)
- val score = queryParam.scorePropagateOp match {
- case "plus" => edgeScore + prevScore
- case "divide" =>
- if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
- else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
- case _ => edgeScore * prevScore
- }
- val tsVal = processTimeDecay(queryParam, edge)
- val newScore = degreeScore + score
- EdgeWithScore(convertedEdge.copyParentEdges(parentEdges), score = newScore * labelWeight * tsVal, label = label)
- }
-
val sampled =
- if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
+ if (queryRequest.queryParam.sample >= 0) sample(edgeWithScores, queryParam.offset, queryParam.sample)
else edgeWithScores
val normalized = if (queryParam.shouldNormalize) normalize(sampled) else sampled
diff --git a/s2graphql/build.sbt b/s2graphql/build.sbt
index a3fbec0..bc71f98 100644
--- a/s2graphql/build.sbt
+++ b/s2graphql/build.sbt
@@ -26,13 +26,15 @@
scalacOptions ++= Seq("-deprecation", "-feature")
libraryDependencies ++= Seq(
+ "org.scala-lang" % "scala-compiler" % scalaVersion.value,
+ "org.scala-lang" % "scala-reflect" % scalaVersion.value,
+
"org.sangria-graphql" %% "sangria" % "1.4.0",
"org.sangria-graphql" %% "sangria-spray-json" % "1.0.0",
"org.sangria-graphql" %% "sangria-play-json" % "1.0.1" % Test,
"com.typesafe.akka" %% "akka-http" % "10.0.10",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.0.10",
-
"com.typesafe.akka" %% "akka-slf4j" % "2.4.6",
"org.scalatest" %% "scalatest" % "3.0.4" % Test
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
index eee7c93..5f1c225 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
@@ -26,6 +26,7 @@
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.graphql.middleware.{GraphFormatted, Transform}
import org.apache.s2graph.core.S2Graph
import org.apache.s2graph.core.utils.SafeUpdateCache
import org.apache.s2graph.graphql.repository.GraphRepository
@@ -35,12 +36,13 @@
import sangria.execution._
import sangria.execution.deferred.DeferredResolver
import sangria.marshalling.sprayJson._
-import sangria.parser.QueryParser
+import sangria.parser.{QueryParser, SyntaxError}
import sangria.schema.Schema
-import spray.json.{JsBoolean, JsObject, JsString}
+import spray.json._
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
+import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
object GraphQLServer {
@@ -63,7 +65,7 @@
val schemaCache = new SafeUpdateCache(schemaConfig)
- def updateEdgeFetcher(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = {
+ def updateEdgeFetcher(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Try[Unit] = {
val ret = Try {
val spray.json.JsObject(fields) = requestJSON
val spray.json.JsString(labelName) = fields("label")
@@ -72,33 +74,7 @@
s2graph.management.updateEdgeFetcher(labelName, jsOptions.compactPrint)
}
- ret match {
- case Success(f) => complete(OK -> JsString("start"))
- case Failure(e) => complete(InternalServerError -> spray.json.JsObject("message" -> JsString(e.toString)))
- }
- }
-
- def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = {
- val spray.json.JsObject(fields) = requestJSON
- val spray.json.JsString(query) = fields("query")
-
- val operation = fields.get("operationName") collect {
- case spray.json.JsString(op) => op
- }
-
- val vars = fields.get("variables") match {
- case Some(obj: spray.json.JsObject) => obj
- case _ => spray.json.JsObject.empty
- }
-
- QueryParser.parse(query) match {
- case Success(queryAst) =>
- logger.info(queryAst.renderCompact)
- complete(executeGraphQLQuery(queryAst, operation, vars))
- case Failure(error) =>
- logger.warn(error.getMessage, error)
- complete(BadRequest -> spray.json.JsObject("error" -> JsString(error.getMessage)))
- }
+ ret
}
/**
@@ -108,35 +84,61 @@
logger.info(s"schemaCacheTTL: ${schemaCacheTTL}")
private def createNewSchema(): Schema[GraphRepository, Any] = {
- logger.info(s"Schema updated: ${System.currentTimeMillis()}")
val newSchema = new SchemaDef(s2Repository).S2GraphSchema
- logger.info("-" * 80)
+ logger.info(s"Schema updated: ${System.currentTimeMillis()}")
newSchema
}
- private def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = {
- val cacheKey = className + "s2Schema"
- val s2schema = schemaCache.withCache(cacheKey, broadcast = false)(createNewSchema())
+ def formatError(error: Throwable): JsValue = error match {
+ case syntaxError: SyntaxError ⇒
+ JsObject("errors" → JsArray(
+ JsObject(
+ "message" → JsString(syntaxError.getMessage),
+ "locations" → JsArray(JsObject(
+ "line" → JsNumber(syntaxError.originalError.position.line),
+ "column" → JsNumber(syntaxError.originalError.position.column))))))
+
+ case NonFatal(e) ⇒ formatError(e.toString)
+ case e ⇒ throw e
+ }
+
+ def formatError(message: String): JsObject =
+ JsObject("errors" → JsArray(JsObject("message" → JsString(message))))
+
+ def onEvictSchema(o: AnyRef): Unit = {
+ logger.info("Schema Evicted")
+ }
+
+ val TransformMiddleWare = List(org.apache.s2graph.graphql.middleware.Transform())
+
+ def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = {
import GraphRepository._
+
+ val cacheKey = className + "s2Schema"
+ val s2schema = schemaCache.withCache(cacheKey, broadcast = false, onEvict = onEvictSchema)(createNewSchema())
val resolver: DeferredResolver[GraphRepository] = DeferredResolver.fetchers(vertexFetcher, edgeFetcher)
+ val includeGrpaph = vars.fields.get("includeGraph").contains(spray.json.JsBoolean(true))
+ val middleWares = if (includeGrpaph) GraphFormatted :: TransformMiddleWare else TransformMiddleWare
+
Executor.execute(
s2schema,
query,
s2Repository,
variables = vars,
operationName = op,
- deferredResolver = resolver
- )
- .map((res: spray.json.JsValue) => OK -> res)
+ deferredResolver = resolver,
+ middleware = middleWares
+ ).map((res: spray.json.JsValue) => OK -> res)
.recover {
case error: QueryAnalysisError =>
- logger.warn(error.getMessage, error)
+ logger.error("Error on execute", error)
BadRequest -> error.resolveError
case error: ErrorWithResolver =>
- logger.error(error.getMessage, error)
+ logger.error("Error on execute", error)
InternalServerError -> error.resolveError
}
}
}
+
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
index 6f57cc4..755d185 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,18 +19,29 @@
package org.apache.s2graph.graphql
+import java.nio.charset.Charset
+
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
+import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server._
+import akka.http.scaladsl.server.{Route, StandardRoute}
import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.Flow
import org.slf4j.LoggerFactory
+import sangria.parser.QueryParser
+import spray.json._
import scala.concurrent.Await
import scala.language.postfixOps
+import scala.util._
+import akka.http.scaladsl.marshalling.{Marshaller, ToEntityMarshaller, ToResponseMarshallable}
+import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
+import akka.util.ByteString
+import sangria.ast.Document
+import sangria.renderer.{QueryRenderer, QueryRendererConfig}
+
+import scala.collection.immutable.Seq
object Server extends App {
val logger = LoggerFactory.getLogger(this.getClass)
@@ -39,20 +50,57 @@
implicit val materializer = ActorMaterializer()
import system.dispatcher
-
import scala.concurrent.duration._
- val route: Flow[HttpRequest, HttpResponse, Any] = (post & path("graphql")) {
- entity(as[spray.json.JsValue])(GraphQLServer.endpoint)
- } ~ (post & path("updateEdgeFetcher")) {
- entity(as[spray.json.JsValue])(GraphQLServer.updateEdgeFetcher)
- } ~ {
- getFromResource("assets/graphiql.html")
- }
+ import spray.json.DefaultJsonProtocol._
+
+ val route: Route =
+ get {
+ getFromResource("assets/graphiql.html")
+ } ~ (post & path("updateEdgeFetcher")) {
+ entity(as[JsValue]) { body =>
+ GraphQLServer.updateEdgeFetcher(body) match {
+ case Success(_) => complete(StatusCodes.OK -> JsString("Update fetcher finished"))
+ case Failure(e) =>
+ logger.error("Error on execute", e)
+ complete(StatusCodes.InternalServerError -> spray.json.JsObject("message" -> JsString(e.toString)))
+ }
+ }
+ } ~ (post & path("graphql")) {
+ parameters('operationName.?, 'variables.?) { (operationNameParam, variablesParam) =>
+ entity(as[Document]) { document ⇒
+ variablesParam.map(parseJson) match {
+ case None ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationNameParam, JsObject()))
+ case Some(Right(js)) ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationNameParam, js.asJsObject))
+ case Some(Left(e)) ⇒
+ logger.error("Error on execute", e)
+ complete(StatusCodes.BadRequest -> GraphQLServer.formatError(e))
+ }
+ } ~ entity(as[JsValue]) { body ⇒
+ val fields = body.asJsObject.fields
+
+ val query = fields.get("query").map(js => js.convertTo[String])
+ val operationName = fields.get("operationName").filterNot(_ == null).map(_.convertTo[String])
+ val variables = fields.get("variables").filterNot(_ == null)
+
+ query.map(QueryParser.parse(_)) match {
+ case None ⇒ complete(StatusCodes.BadRequest -> GraphQLServer.formatError("No query to execute"))
+ case Some(Failure(error)) ⇒
+ logger.error("Error on execute", error)
+ complete(StatusCodes.BadRequest -> GraphQLServer.formatError(error))
+ case Some(Success(document)) => variables match {
+ case Some(js) ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationName, js.asJsObject))
+ case None ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationName, JsObject()))
+ }
+ }
+ }
+ }
+ }
val port = sys.props.get("http.port").fold(8000)(_.toInt)
- logger.info(s"Starting GraphQL server... ${port}")
+ logger.info(s"Starting GraphQL server... $port")
+
Http().bindAndHandle(route, "0.0.0.0", port)
def shutdown(): Unit = {
@@ -63,4 +111,40 @@
logger.info("Terminated.")
}
+
+ // Unmarshaller
+
+ def unmarshallerContentTypes: Seq[ContentTypeRange] = mediaTypes.map(ContentTypeRange.apply)
+
+ def mediaTypes: Seq[MediaType.WithFixedCharset] =
+ Seq(MediaType.applicationWithFixedCharset("graphql", HttpCharsets.`UTF-8`, "graphql"))
+
+ implicit def documentMarshaller(implicit config: QueryRendererConfig = QueryRenderer.Compact): ToEntityMarshaller[Document] = {
+ Marshaller.oneOf(mediaTypes: _*) {
+ mediaType ⇒
+ Marshaller.withFixedContentType(ContentType(mediaType)) {
+ json ⇒ HttpEntity(mediaType, QueryRenderer.render(json, config))
+ }
+ }
+ }
+
+ implicit val documentUnmarshaller: FromEntityUnmarshaller[Document] = {
+ Unmarshaller.byteStringUnmarshaller
+ .forContentTypes(unmarshallerContentTypes: _*)
+ .map {
+ case ByteString.empty ⇒ throw Unmarshaller.NoContentException
+ case data ⇒
+ import sangria.parser.DeliveryScheme.Throw
+ QueryParser.parse(data.decodeString(Charset.forName("UTF-8")))
+ }
+ }
+
+ def parseJson(jsStr: String): Either[Throwable, JsValue] = {
+ val parsed = Try(jsStr.parseJson)
+ parsed match {
+ case Success(js) => Right(js)
+ case Failure(e) => Left(e)
+ }
+ }
+
}
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/GraphFormatWriter.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/GraphFormatWriter.scala
new file mode 100644
index 0000000..6dcc368
--- /dev/null
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/GraphFormatWriter.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.graphql.middleware
+
+import org.apache.s2graph.core.schema.ServiceColumn
+import org.apache.s2graph.core.{GraphElement, S2EdgeLike, S2VertexLike}
+import org.apache.s2graph.graphql.types.PlayJsonScalarType
+import org.slf4s.LoggerFactory
+import play.api.libs.json._
+import sangria.execution._
+import sangria.schema.Context
+
+
+object GraphFormatted extends Middleware[Any] with MiddlewareAfterField[Any] with MiddlewareExtension[Any] {
+ implicit val logger = LoggerFactory.getLogger(this.getClass)
+
+ type QueryVal = java.util.concurrent.ConcurrentHashMap[GraphElement, Unit]
+
+ type FieldVal = Long
+
+
+ def beforeQuery(context: MiddlewareQueryContext[Any, _, _]) = {
+ new java.util.concurrent.ConcurrentHashMap[GraphElement, Unit]()
+ }
+
+ def afterQuery(queryVal: QueryVal, context: MiddlewareQueryContext[Any, _, _]) = ()
+
+ def toVertexId(v: S2VertexLike, c: ServiceColumn): String = {
+ val innerId = v.innerId.toIdString()
+
+ s"${c.service.serviceName}.${c.columnName}.${innerId}"
+ }
+
+ def toVertexJson(v: S2VertexLike, c: ServiceColumn): JsValue = {
+ Json.obj(
+ "id" -> toVertexId(v, c),
+ "label" -> v.innerId.toIdString()
+ )
+ }
+
+ def toEdgeJson(e: S2EdgeLike): JsValue = {
+ Json.obj(
+ "source" -> toVertexId(e.srcVertex, e.innerLabel.srcColumn),
+ "target" -> toVertexId(e.tgtVertex, e.innerLabel.tgtColumn),
+ "id" -> s"${toVertexId(e.srcVertex, e.innerLabel.srcColumn)}.${e.label()}.${toVertexId(e.tgtVertex, e.innerLabel.tgtColumn)}",
+ "label" -> e.label()
+ )
+ }
+
+ def afterQueryExtensions(queryVal: QueryVal,
+ context: MiddlewareQueryContext[Any, _, _]
+ ): Vector[Extension[_]] = {
+
+ import scala.collection.JavaConverters._
+ val elements = queryVal.keys().asScala.toVector
+
+ val edges = elements.collect { case e: S2EdgeLike => e }
+ val vertices = elements.collect { case v: S2VertexLike => v -> v.serviceColumn }
+ val verticesFromEdges = edges.flatMap { e =>
+ val label = e.innerLabel
+ Vector((e.srcVertex, label.srcColumn), (e.tgtVertex, label.srcColumn))
+ }
+
+ val verticesJson = (vertices ++ verticesFromEdges).map { case (v, c) => toVertexJson(v, c) }.distinct
+ val edgeJson = edges.map(toEdgeJson).distinct
+
+ val jsElements = Json.obj(
+ "nodes" -> verticesJson,
+ "edges" -> edgeJson
+ )
+
+ val graph = Json.obj("graph" -> jsElements)
+
+ /**
+ * nodes: [{id, label, x, y, size}, ..],
+ * edges: [{id, source, target, label}]
+ */
+ implicit val iu = PlayJsonScalarType.PlayJsonInputUnmarshaller
+ Vector(Extension[JsValue](graph))
+ }
+
+ def beforeField(queryVal: QueryVal, mctx: MiddlewareQueryContext[Any, _, _], ctx: Context[Any, _]) = {
+ continue(System.currentTimeMillis())
+ }
+
+ def afterField(queryVal: QueryVal, fieldVal: FieldVal, value: Any, mctx: MiddlewareQueryContext[Any, _, _], ctx: Context[Any, _]) = {
+ // logger.info(s"${ctx.parentType.name}.${ctx.field.name} = ${value.getClass.getName}")
+
+ value match {
+ case ls: Seq[_] => ls.foreach {
+ case e: GraphElement => queryVal.put(e, ())
+ case _ =>
+ }
+ case e: GraphElement => queryVal.put(e, ())
+ case _ =>
+ }
+
+ None
+ }
+}
+
+
+
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/Transform.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/Transform.scala
new file mode 100644
index 0000000..9aa86fb
--- /dev/null
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/middleware/Transform.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.graphql.middleware
+
+import org.apache.s2graph.graphql.types.S2Directive
+import sangria.ast.StringValue
+import sangria.execution.{Middleware, MiddlewareAfterField, MiddlewareQueryContext}
+import sangria.schema.Context
+
+case class Transform() extends Middleware[Any] with MiddlewareAfterField[Any] {
+ type QueryVal = Unit
+ type FieldVal = Unit
+
+ def beforeQuery(context: MiddlewareQueryContext[Any, _, _]) = ()
+
+ def afterQuery(queryVal: QueryVal, context: MiddlewareQueryContext[Any, _, _]) = ()
+
+ def beforeField(cache: QueryVal, mctx: MiddlewareQueryContext[Any, _, _], ctx: Context[Any, _]) = continue
+
+ def afterField(cache: QueryVal, fromCache: FieldVal, value: Any, mctx: MiddlewareQueryContext[Any, _, _], ctx: Context[Any, _]) = {
+
+ value match {
+ case s: String => {
+ val transformFuncOpt = ctx.astFields.headOption.flatMap(_.directives.find(_.name == S2Directive.Transform.name)).map { directive =>
+ directive.arguments.head.value.asInstanceOf[StringValue].value
+ }
+
+ transformFuncOpt.map(funcString => S2Directive.resolveTransform(funcString, s)).orElse(Option(s))
+ }
+
+ case _ ⇒ None
+ }
+ }
+}
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
index 0b5a2e9..b5e65dc 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
@@ -123,9 +123,8 @@
}
def getVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = {
- graph.asInstanceOf[S2Graph].searchVertices(queryParam).map { a =>
- println(a)
- a
+ graph.asInstanceOf[S2Graph].searchVertices(queryParam).map { v =>
+ v
}
}
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
index 478517f..a2b80da 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
@@ -28,7 +28,6 @@
import sangria.schema._
object FieldResolver {
-
def graphElement[A](name: String, cType: String, c: Context[GraphRepository, Any]): A = {
c.value match {
case v: S2VertexLike => name match {
@@ -36,6 +35,7 @@
case _ =>
val innerVal = v.propertyValue(name).get
JSONParser.innerValToAny(innerVal, cType).asInstanceOf[A]
+
}
case e: S2EdgeLike => name match {
case "timestamp" => e.ts.asInstanceOf[A]
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/PlayJsonScalarType.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/PlayJsonScalarType.scala
new file mode 100644
index 0000000..fc7f1ce
--- /dev/null
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/PlayJsonScalarType.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.graphql.types
+
+import play.api.libs.json._
+import sangria.ast
+import sangria.execution.Executor
+import sangria.marshalling.{ArrayMapBuilder, InputUnmarshaller, ResultMarshaller, ScalarValueInfo}
+import sangria.schema._
+import sangria.validation.{BigIntCoercionViolation, IntCoercionViolation, ValueCoercionViolation}
+import sangria.macros._
+
+import scala.concurrent.ExecutionContext.Implicits.global
+
+object PlayJsonScalarType {
+
+ implicit object CustomPlayJsonResultMarshaller extends ResultMarshaller {
+ type Node = JsValue
+ type MapBuilder = ArrayMapBuilder[Node]
+
+ def emptyMapNode(keys: Seq[String]) = new ArrayMapBuilder[Node](keys)
+
+ def addMapNodeElem(builder: MapBuilder, key: String, value: Node, optional: Boolean) = builder.add(key, value)
+
+ def mapNode(builder: MapBuilder) = JsObject(builder.toMap)
+
+ def mapNode(keyValues: Seq[(String, JsValue)]) = Json.toJson(keyValues.toMap)
+
+ def arrayNode(values: Vector[JsValue]) = JsArray(values)
+
+ def optionalArrayNodeValue(value: Option[JsValue]) = value match {
+ case Some(v) ⇒ v
+ case None ⇒ nullNode
+ }
+
+ def scalarNode(value: Any, typeName: String, info: Set[ScalarValueInfo]) = value match {
+ case v: String ⇒ JsString(v)
+ case v: Boolean ⇒ JsBoolean(v)
+ case v: Int ⇒ JsNumber(v)
+ case v: Long ⇒ JsNumber(v)
+ case v: Float ⇒ JsNumber(BigDecimal(v))
+ case v: Double ⇒ JsNumber(v)
+ case v: BigInt ⇒ JsNumber(BigDecimal(v))
+ case v: BigDecimal ⇒ JsNumber(v)
+ case v: JsValue ⇒ v
+ case v ⇒ throw new IllegalArgumentException("Unsupported scalar value: " + v)
+ }
+
+ def enumNode(value: String, typeName: String) = JsString(value)
+
+ def nullNode = JsNull
+
+ def renderCompact(node: JsValue) = Json.stringify(node)
+
+ def renderPretty(node: JsValue) = Json.prettyPrint(node)
+ }
+
+ implicit object PlayJsonInputUnmarshaller extends InputUnmarshaller[JsValue] {
+ def getRootMapValue(node: JsValue, key: String) = node.asInstanceOf[JsObject].value get key
+
+ def isListNode(node: JsValue) = node.isInstanceOf[JsArray]
+
+ def getListValue(node: JsValue) = node.asInstanceOf[JsArray].value
+
+ def isMapNode(node: JsValue) = node.isInstanceOf[JsObject]
+
+ def getMapValue(node: JsValue, key: String) = node.asInstanceOf[JsObject].value get key
+
+ def getMapKeys(node: JsValue) = node.asInstanceOf[JsObject].fields.map(_._1)
+
+ def isDefined(node: JsValue) = node != JsNull
+
+ def getScalarValue(node: JsValue) = node match {
+ case JsBoolean(b) ⇒ b
+ case JsNumber(d) ⇒ d.toBigIntExact getOrElse d
+ case JsString(s) ⇒ s
+ case n ⇒ n
+ }
+
+ def getScalaScalarValue(node: JsValue) = getScalarValue(node)
+
+ def isEnumNode(node: JsValue) = node.isInstanceOf[JsString]
+
+ def isScalarNode(node: JsValue) = true
+
+ def isVariableNode(node: JsValue) = false
+
+ def getVariableName(node: JsValue) = throw new IllegalArgumentException("variables are not supported")
+
+ def render(node: JsValue) = Json.stringify(node)
+ }
+
+ case object JsonCoercionViolation extends ValueCoercionViolation("Not valid JSON")
+
+ implicit val JsonType = ScalarType[JsValue]("Json",
+ description = Some("Raw PlayJson value"),
+ coerceOutput = (value, _) ⇒ value,
+ coerceUserInput = {
+ case v: String ⇒ Right(JsString(v))
+ case v: Boolean ⇒ Right(JsBoolean(v))
+ case v: Int ⇒ Right(JsNumber(v))
+ case v: Long ⇒ Right(JsNumber(v))
+ case v: Float ⇒ Right(JsNumber(BigDecimal(v)))
+ case v: Double ⇒ Right(JsNumber(v))
+ case v: BigInt ⇒ Right(JsNumber(BigDecimal(v)))
+ case v: BigDecimal ⇒ Right(JsNumber(v))
+ case v: JsValue ⇒ Right(v)
+ },
+ coerceInput = {
+ case sv: ast.StringValue => Right(Json.parse(sv.value))
+ case _ ⇒
+ Left(JsonCoercionViolation)
+ })
+}
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Directive.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Directive.scala
new file mode 100644
index 0000000..54512c1
--- /dev/null
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Directive.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.graphql.types
+
+import sangria.schema.{Argument, Directive, DirectiveLocation, StringType}
+
+object S2Directive {
+
+ object Eval {
+
+ import scala.collection._
+
+ val codeMap = mutable.Map.empty[String, () => Any]
+
+ def compileCode(code: String): () => Any = {
+ import scala.tools.reflect.ToolBox
+ val toolbox = reflect.runtime.currentMirror.mkToolBox()
+
+ toolbox.compile(toolbox.parse(code))
+ }
+
+ def getCompiledCode[T](code: String): T = {
+ val compiledCode = Eval.codeMap.getOrElseUpdate(code, Eval.compileCode(code))
+
+ compiledCode
+ .asInstanceOf[() => Any]
+ .apply()
+ .asInstanceOf[T]
+ }
+ }
+
+ type TransformFunc = String => String
+
+ val funcArg = Argument("func", StringType)
+
+ val Transform =
+ Directive("transform",
+ arguments = List(funcArg),
+ locations = Set(DirectiveLocation.Field),
+ shouldInclude = _ => true)
+
+ def resolveTransform(code: String, input: String): String = {
+ try {
+ val fn = Eval.getCompiledCode[TransformFunc](code)
+
+ fn.apply(input)
+ } catch {
+ case e: Exception => e.toString
+ }
+ }
+}
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala
index 27b0c50..7451023 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala
@@ -20,8 +20,6 @@
package org.apache.s2graph.graphql.types
import org.apache.s2graph.graphql.repository.GraphRepository
-import org.apache.s2graph.graphql.types._
-
/**
* S2Graph GraphQL schema.
@@ -47,5 +45,13 @@
fields(s2Type.mutationFields ++ mutateManagementFields: _*)
)
- val S2GraphSchema = Schema(S2QueryType, Option(S2MutationType))
+ val directives = S2Directive.Transform :: BuiltinDirectives
+
+ private val s2Schema = Schema(
+ S2QueryType,
+ Option(S2MutationType),
+ directives = directives
+ )
+
+ val S2GraphSchema = s2Schema
}
diff --git a/s2graphql/src/test/scala/org/apache/s2graph/graphql/DirectiveTest.scala b/s2graphql/src/test/scala/org/apache/s2graph/graphql/DirectiveTest.scala
new file mode 100644
index 0000000..699a968
--- /dev/null
+++ b/s2graphql/src/test/scala/org/apache/s2graph/graphql/DirectiveTest.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.graphql
+
+import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.graphql.types.S2Directive
+import org.scalatest._
+
+class DirectiveTest extends FunSuite with Matchers with BeforeAndAfterAll {
+ var testGraph: TestGraph = _
+
+ override def beforeAll = {
+ val config = ConfigFactory.load()
+ testGraph = new EmptyGraph(config)
+ testGraph.open()
+ }
+
+ override def afterAll(): Unit = {
+ testGraph.cleanup()
+ }
+
+ test("transform") {
+ val input = "20170601_A0"
+ val code =
+ """ (s: String) => {
+ val date = s.split("_").head
+ s"http://abc.xy.com/IMG_${date}.png"
+ }
+
+ """.stripMargin
+ val actual = S2Directive.resolveTransform(code, input)
+ val expected = "http://abc.xy.com/IMG_20170601.png"
+
+ actual shouldBe expected
+ }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
index 717d087..8f21bc2 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
@@ -29,7 +29,12 @@
def run() = {
// source
- jobDesc.sources.foreach{ source => dfMap.put(source.conf.name, source.toDF(ss))}
+ jobDesc.sources.foreach{ source =>
+ val df = source.toDF(ss)
+ if (source.conf.cache.getOrElse(false) && !df.isStreaming) df.cache()
+
+ dfMap.put(source.conf.name, df)
+ }
logger.debug(s"valid source DF set : ${dfMap.keySet}")
// process
@@ -64,7 +69,9 @@
}
.map { p =>
val inputMap = p.conf.inputs.map{ input => (input, dfMap(input)) }.toMap
- p.conf.name -> p.execute(ss, inputMap)
+ val df = p.execute(ss, inputMap)
+ if (p.conf.cache.getOrElse(false) && !df.isStreaming) df.cache()
+ p.conf.name -> df
}
}
}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
index 6bc100f..c077f5b 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
@@ -20,6 +20,7 @@
package org.apache.s2graph.s2jobs.task
import org.apache.spark.sql.{DataFrame, SparkSession}
+import play.api.libs.json.Json
/**
* Process
@@ -47,7 +48,68 @@
val sql = conf.options("sql")
logger.debug(s"${LOG_PREFIX} sql : $sql")
- ss.sql(sql)
+ postProcess(ss.sql(sql))
}
+
+ /**
+ * extraOperations
+ * @param df
+ * @return
+ */
+ private def postProcess(df: DataFrame): DataFrame = {
+ import org.apache.spark.sql.functions._
+
+ var resultDF = df
+
+ // watermark
+ val timeColumn = conf.options.get(s"time.column")
+ logger.debug(s">> timeColumn: ${timeColumn}")
+
+ val waterMarkDelayTime = conf.options.get(s"watermark.delay.time")
+ if (waterMarkDelayTime.isDefined){
+ logger.debug(s">> waterMarkDelayTime : ${waterMarkDelayTime}")
+ if (timeColumn.isDefined) {
+ resultDF = resultDF.withWatermark(timeColumn.get, waterMarkDelayTime.get)
+ } else logger.warn("time.column does not exists.. cannot apply watermark")
+ }
+
+ // drop duplication
+ val dropDuplicateColumns = conf.options.get("drop.duplicate.columns")
+ if (dropDuplicateColumns.isDefined) {
+ logger.debug(s">> dropDuplicates : ${dropDuplicateColumns}")
+ resultDF = resultDF.dropDuplicates(dropDuplicateColumns.get.split(","))
+ }
+
+ // groupBy
+ val groupedKeysOpt = conf.options.get(s"grouped.keys")
+ if (groupedKeysOpt.isDefined) {
+ var groupedKeys = groupedKeysOpt.get.split(",").map{ key =>
+ col(key.trim)
+ }.toSeq
+
+ val windowDurationOpt = conf.options.get(s"grouped.window.duration")
+ val slideDurationOpt = conf.options.get(s"grouped.slide.duration")
+ if (windowDurationOpt.isDefined && slideDurationOpt.isDefined){
+ logger.debug(s">> using window operation : Duration ${windowDurationOpt}, slideDuration : ${slideDurationOpt}")
+ groupedKeys = groupedKeys ++ Seq(window(col(timeColumn.get), windowDurationOpt.get, slideDurationOpt.get))
+ }
+ logger.debug(s">> groupedKeys: ${groupedKeys}")
+
+ // aggregate options
+ val aggExprs = Json.parse(conf.options.getOrElse(s"grouped.dataset.agg", "[\"count(1)\"]")).as[Seq[String]].map(expr(_))
+ logger.debug(s">> aggr : ${aggExprs}")
+
+ val groupedDF = resultDF.groupBy(groupedKeys: _*)
+
+ resultDF = if (aggExprs.size > 1) {
+ groupedDF.agg(aggExprs.head, aggExprs.tail: _*)
+ } else {
+ groupedDF.agg(aggExprs.head)
+ }
+ }
+
+ resultDF
+ }
+
}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index ae88b6d..720c2d7 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -69,6 +69,8 @@
}
val interval = conf.options.getOrElse("interval", DEFAULT_TRIGGER_INTERVAL)
val checkpointLocation = conf.options.getOrElse("checkpointLocation", DEFAULT_CHECKPOINT_LOCATION)
+ val isContinuous = conf.options.getOrElse("isContinuous", "false").toBoolean
+ val trigger = if (isContinuous) Trigger.Continuous(interval) else Trigger.ProcessingTime(interval)
val cfg = conf.options ++ Map("checkpointLocation" -> checkpointLocation)
@@ -78,7 +80,7 @@
.queryName(s"${queryName}_${conf.name}")
.format(FORMAT)
.options(cfg)
- .trigger(Trigger.ProcessingTime(interval))
+ .trigger(trigger)
.outputMode(mode)
.start()
}
@@ -93,7 +95,7 @@
case _ => SaveMode.Overwrite
}
- val partitionedWriter = if (partitionsOpt.isDefined) writer.partitionBy(partitionsOpt.get) else writer
+ val partitionedWriter = if (partitionsOpt.isDefined) writer.partitionBy(partitionsOpt.get.split(","): _*) else writer
writeBatchInner(partitionedWriter.format(FORMAT).mode(mode))
}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index 9c80e58..259cfc0 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -40,19 +40,30 @@
val DEFAULT_FORMAT = "raw"
override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", "subscribe")
+ def repartition(df: DataFrame, defaultParallelism: Int) = {
+ conf.options.get("numPartitions").map(n => Integer.parseInt(n)) match {
+ case Some(numOfPartitions: Int) =>
+ logger.info(s"[repartitition] $numOfPartitions ($defaultParallelism)")
+ if (numOfPartitions >= defaultParallelism) df.repartition(numOfPartitions)
+ else df.coalesce(numOfPartitions)
+ case None => df
+ }
+ }
+
override def toDF(ss:SparkSession):DataFrame = {
logger.info(s"${LOG_PREFIX} options: ${conf.options}")
val format = conf.options.getOrElse("format", "raw")
val df = ss.readStream.format("kafka").options(conf.options).load()
+ val partitionedDF = repartition(df, df.sparkSession.sparkContext.defaultParallelism)
format match {
- case "raw" => df
- case "json" => parseJsonSchema(ss, df)
+ case "raw" => partitionedDF
+ case "json" => parseJsonSchema(ss, partitionedDF)
// case "custom" => parseCustomSchema(df)
case _ =>
logger.warn(s"${LOG_PREFIX} unsupported format '$format'.. use default schema ")
- df
+ partitionedDF
}
}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index ea42828..1210132 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -42,8 +42,7 @@
taskConf.options.filterKeys(_.startsWith("cache.")).mapValues(_.toInt)
}
}
-
-case class TaskConf(name: String, `type`: String, inputs: Seq[String] = Nil, options: Map[String, String] = Map.empty)
+case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty, cache:Option[Boolean]=None)
trait Task extends Serializable with Logger {
val conf: TaskConf