merge S2GRAPH-249
diff --git a/s2http/src/main/scala/org/apache/s2graph/http/PlayJsonSupport.scala b/s2http/src/main/scala/org/apache/s2graph/http/PlayJsonSupport.scala
index 8b4e91c..244e588 100644
--- a/s2http/src/main/scala/org/apache/s2graph/http/PlayJsonSupport.scala
+++ b/s2http/src/main/scala/org/apache/s2graph/http/PlayJsonSupport.scala
@@ -6,7 +6,7 @@
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
import akka.util.ByteString
-import play.api.libs.json.{JsValue, Json}
+import play.api.libs.json._
trait PlayJsonSupport {
@@ -31,4 +31,44 @@
case data => Json.parse(data.decodeString(Charset.forName("UTF-8")))
}
}
+
+ trait ToPlayJson[T] {
+ def toJson(msg: T): JsValue
+ }
+
+ import scala.language.reflectiveCalls
+
+ object ToPlayJson {
+ type ToPlayJsonReflective = {
+ def toJson: JsValue
+ }
+
+ implicit def forToJson[A <: ToPlayJsonReflective] = new ToPlayJson[A] {
+ def toJson(js: A) = js.toJson
+ }
+
+ implicit def forPlayJson[A <: JsValue] = new ToPlayJson[A] {
+ def toJson(js: A) = js
+ }
+ }
+
+ implicit object JsErrorJsonWriter extends Writes[JsError] {
+ def writes(o: JsError): JsValue = Json.obj(
+ "errors" -> JsArray(
+ o.errors.map {
+ case (path, validationErrors) => Json.obj(
+ "path" -> Json.toJson(path.toString()),
+ "validationErrors" -> JsArray(validationErrors.map(validationError => Json.obj(
+ "message" -> JsString(validationError.message),
+ "args" -> JsArray(validationError.args.map {
+ case x: Int => JsNumber(x)
+ case x => JsString(x.toString)
+ })
+ )))
+ )
+ }
+ )
+ )
+ }
+
}
diff --git a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala
index 9bf7eb4..47ac86a 100644
--- a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala
+++ b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala
@@ -12,30 +12,9 @@
import scala.util._
-object S2GraphAdminRoute {
-
- trait AdminMessageFormatter[T] {
- def toJson(msg: T): JsValue
- }
-
- import scala.language.reflectiveCalls
-
- object AdminMessageFormatter {
- type ToPlayJson = {
- def toJson: JsValue
- }
-
- implicit def toPlayJson[A <: ToPlayJson] = new AdminMessageFormatter[A] {
- def toJson(js: A) = js.toJson
- }
-
- implicit def fromPlayJson[T <: JsValue] = new AdminMessageFormatter[T] {
- def toJson(js: T) = js
- }
- }
-
- def toHttpEntity[A: AdminMessageFormatter](opt: Option[A], status: StatusCode = StatusCodes.OK, message: String = ""): HttpResponse = {
- val ev = implicitly[AdminMessageFormatter[A]]
+object S2GraphAdminRoute extends PlayJsonSupport {
+ def toHttpEntity[A: ToPlayJson](opt: Option[A], status: StatusCode = StatusCodes.OK, message: String = ""): HttpResponse = {
+ val ev = implicitly[ToPlayJson[A]]
val res = opt.map(ev.toJson).getOrElse(Json.obj("message" -> message))
HttpResponse(
@@ -44,9 +23,9 @@
)
}
- def toHttpEntity[A: AdminMessageFormatter](opt: Try[A]): HttpResponse = {
- val ev = implicitly[AdminMessageFormatter[A]]
- val (status, res) = opt match {
+ def toHttpEntity[A: ToPlayJson](_try: Try[A]): HttpResponse = {
+ val ev = implicitly[ToPlayJson[A]]
+ val (status, res) = _try match {
case Success(m) => StatusCodes.Created -> Json.obj("status" -> "ok", "message" -> ev.toJson(m))
case Failure(e) => StatusCodes.OK -> Json.obj("status" -> "failure", "message" -> e.toString)
}
@@ -54,9 +33,9 @@
toHttpEntity(Option(res), status = status)
}
- def toHttpEntity[A: AdminMessageFormatter](ls: Seq[A], status: StatusCode = StatusCodes.OK): HttpResponse = {
- val ev = implicitly[AdminMessageFormatter[A]]
- val res = ls.map(ev.toJson)
+ def toHttpEntity[A: ToPlayJson](ls: Seq[A], status: StatusCode): HttpResponse = {
+ val ev = implicitly[ToPlayJson[A]]
+ val res = JsArray(ls.map(ev.toJson))
HttpResponse(
status = status,
@@ -85,11 +64,7 @@
}
// GET /graphs/getServiceColumn/:serviceName/:columnName
- lazy val getServiceColumn = path("getServiceColumn" / Segments) { params =>
- val (serviceName, columnName) = params match {
- case s :: c :: Nil => (s, c)
- }
-
+ lazy val getServiceColumn = path("getServiceColumn" / Segment / Segment) { (serviceName, columnName) =>
val ret = Management.findServiceColumn(serviceName, columnName)
complete(toHttpEntity(ret, message = s"ServiceColumn not found: ${serviceName}, ${columnName}"))
}
@@ -105,7 +80,7 @@
lazy val getLabels = path("getLabels" / Segment) { serviceName =>
val ret = Management.findLabels(serviceName)
- complete(toHttpEntity(ret))
+ complete(toHttpEntity(ret, StatusCodes.OK))
}
/* POST */
@@ -175,6 +150,7 @@
val (serviceName, columnName, storeInGlobalIndex) = params match {
case s :: c :: Nil => (s, c, false)
case s :: c :: i :: Nil => (s, c, i.toBoolean)
+ case _ => throw new RuntimeException("Invalid Params")
}
entity(as[JsValue]) { params =>
@@ -199,45 +175,33 @@
hTableParams.preSplitSize, hTableParams.hTableTTL,
hTableParams.compressionAlgorithm.getOrElse(Management.DefaultCompressionAlgorithm))
- complete(toHttpEntity(None, status = StatusCodes.OK, message = "created"))
+ complete(toHttpEntity(None: Option[JsValue], status = StatusCodes.OK, message = "created"))
}
- case err@JsError(_) => complete(toHttpEntity(None, status = StatusCodes.BadRequest, message = Json.toJson(err).toString))
+ case err@JsError(_) => complete(toHttpEntity(None: Option[JsValue], status = StatusCodes.BadRequest, message = Json.toJson(err).toString))
}
}
}
// POST /graphs/copyLabel/:oldLabelName/:newLabelName
- lazy val copyLabel = path("copyLabel" / Segments) { params =>
- val (oldLabelName, newLabelName) = params match {
- case oldLabel :: newLabel :: Nil => (oldLabel, newLabel)
- }
-
+ lazy val copyLabel = path("copyLabel" / Segment / Segment) { (oldLabelName, newLabelName) =>
val copyTry = management.copyLabel(oldLabelName, newLabelName, Some(newLabelName))
complete(toHttpEntity(copyTry))
}
// POST /graphs/renameLabel/:oldLabelName/:newLabelName
- lazy val renameLabel = path("renameLabel" / Segments) { params =>
- val (oldLabelName, newLabelName) = params match {
- case oldLabel :: newLabel :: Nil => (oldLabel, newLabel)
- }
-
+ lazy val renameLabel = path("renameLabel" / Segment / Segment) { (oldLabelName, newLabelName) =>
Label.findByName(oldLabelName) match {
- case None => complete(toHttpEntity(None, status = StatusCodes.NotFound, message = s"Label $oldLabelName not found."))
+ case None => complete(toHttpEntity(None: Option[JsValue], status = StatusCodes.NotFound, message = s"Label $oldLabelName not found."))
case Some(label) =>
Management.updateLabelName(oldLabelName, newLabelName)
- complete(toHttpEntity(None, message = s"${label} was updated."))
+ complete(toHttpEntity(None: Option[JsValue], message = s"${label} was updated."))
}
}
// POST /graphs/swapLabels/:leftLabelName/:rightLabelName
- lazy val swapLabel = path("swapLabel" / Segments) { params =>
- val (leftLabelName, rightLabelName) = params match {
- case left :: right :: Nil => (left, right)
- }
-
+ lazy val swapLabel = path("swapLabel" / Segment / Segment) { (leftLabelName, rightLabelName) =>
val left = Label.findByName(leftLabelName, useCache = false)
val right = Label.findByName(rightLabelName, useCache = false)
// verify same schema
@@ -245,20 +209,15 @@
(left, right) match {
case (Some(l), Some(r)) =>
Management.swapLabelNames(leftLabelName, rightLabelName)
-
- complete(toHttpEntity(None, message = s"Labels were swapped."))
+ complete(toHttpEntity(None: Option[JsValue], message = s"Labels were swapped."))
case _ =>
- complete(toHttpEntity(None, status = StatusCodes.NotFound, message = s"Label ${leftLabelName} or ${rightLabelName} not found."))
+ complete(toHttpEntity(None: Option[JsValue], status = StatusCodes.NotFound, message = s"Label ${leftLabelName} or ${rightLabelName} not found."))
}
}
// POST /graphs/updateHTable/:labelName/:newHTableName
- lazy val updateHTable = path("updateHTable" / Segments) { params =>
- val (labelName, newHTableName) = params match {
- case l :: h :: Nil => (l, h)
- }
-
- val updateTry = Management.updateHTable(labelName, newHTableName)
+ lazy val updateHTable = path("updateHTable" / Segment / Segment) { (labelName, newHTableName) =>
+ val updateTry = Management.updateHTable(labelName, newHTableName).map(Json.toJson(_))
complete(toHttpEntity(updateTry))
}
@@ -273,17 +232,13 @@
// PUT /graphs/markDeletedLabel/:labelName
lazy val markDeletedLabel = path("markDeletedLabel" / Segment) { labelName =>
- val ret = Management.markDeletedLabel(labelName).toOption
+ val ret = Management.markDeletedLabel(labelName).toOption.map(Json.toJson(_))
complete(toHttpEntity(ret, message = s"Label not found: ${labelName}"))
}
// PUT /graphs/deleteServiceColumn/:serviceName/:columnName
- lazy val deleteServiceColumn = path("deleteServiceColumn" / Segments) { params =>
- val (serviceName, columnName) = params match {
- case s :: c :: Nil => (s, c)
- }
-
+ lazy val deleteServiceColumn = path("deleteServiceColumn" / Segment / Segment) { (serviceName, columnName) =>
val ret = Management.deleteColumn(serviceName, columnName).toOption
complete(toHttpEntity(ret, message = s"ServiceColumn not found: ${serviceName}, ${columnName}"))
diff --git a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala
index fc3b768..a65db5a 100644
--- a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala
+++ b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala
@@ -12,56 +12,40 @@
import scala.concurrent.{ExecutionContext, Future}
-trait S2GraphMutateRoute {
+trait S2GraphMutateRoute extends PlayJsonSupport {
val s2graph: S2Graph
val logger = LoggerFactory.getLogger(this.getClass)
+
lazy val parser = new RequestParser(s2graph)
// lazy val requestParser = new RequestParser(s2graph)
lazy val exceptionHandler = ExceptionHandler {
- case ex: JsonParseException =>
- complete(StatusCodes.BadRequest -> ex.getMessage)
- case ex: java.lang.IllegalArgumentException =>
- complete(StatusCodes.BadRequest -> ex.getMessage)
+ case ex: JsonParseException => complete(StatusCodes.BadRequest -> ex.getMessage)
+ case ex: java.lang.IllegalArgumentException => complete(StatusCodes.BadRequest -> ex.getMessage)
}
lazy val mutateVertex = path("vertex" / Segments) { params =>
+ implicit val ec = s2graph.ec
+
val (operation, serviceNameOpt, columnNameOpt) = params match {
- case operation :: serviceName :: columnName :: Nil =>
- (operation, Option(serviceName), Option(columnName))
- case operation :: Nil =>
- (operation, None, None)
+ case operation :: serviceName :: columnName :: Nil => (operation, Option(serviceName), Option(columnName))
+ case operation :: Nil => (operation, None, None)
+ case _ => throw new RuntimeException("invalid params")
}
- entity(as[String]) { body =>
- val payload = Json.parse(body)
-
- implicit val ec = s2graph.ec
-
- val future = vertexMutate(payload, operation, serviceNameOpt, columnNameOpt).map { mutateResponses =>
- HttpResponse(
- status = StatusCodes.OK,
- entity = HttpEntity(ContentTypes.`application/json`, Json.toJson(mutateResponses).toString)
- )
- }
+ entity(as[JsValue]) { payload =>
+ val future = vertexMutate(payload, operation, serviceNameOpt, columnNameOpt).map(Json.toJson(_))
complete(future)
}
}
lazy val mutateEdge = path("edge" / Segment) { operation =>
- entity(as[String]) { body =>
- val payload = Json.parse(body)
+ implicit val ec = s2graph.ec
- implicit val ec = s2graph.ec
-
- val future = edgeMutate(payload, operation, withWait = true).map { mutateResponses =>
- HttpResponse(
- status = StatusCodes.OK,
- entity = HttpEntity(ContentTypes.`application/json`, Json.toJson(mutateResponses).toString)
- )
- }
+ entity(as[JsValue]) { payload =>
+ val future = edgeMutate(payload, operation, withWait = true).map(Json.toJson(_))
complete(future)
}
@@ -79,13 +63,10 @@
s2graph.mutateVertices(verticesToStore, withWait).map(_.map(_.isSuccess))
}
- def edgeMutate(elementsWithTsv: Seq[(GraphElement, String)],
- withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+ def edgeMutate(elementsWithTsv: Seq[(GraphElement, String)], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
val elementWithIdxs = elementsWithTsv.zipWithIndex
+ val (elementSync, elementAsync) = elementWithIdxs.partition { case ((element, tsv), idx) => !element.isAsync }
- val (elementSync, elementAsync) = elementWithIdxs.partition { case ((element, tsv), idx) =>
- !element.isAsync
- }
val retToSkip = elementAsync.map(_._2 -> MutateResponse.Success)
val (elementsToStore, _) = elementSync.map(_._1).unzip
val elementsIdxToStore = elementSync.map(_._2)
@@ -95,9 +76,7 @@
}.map(_.sortBy(_._1).map(_._2.isSuccess))
}
- def edgeMutate(jsValue: JsValue,
- operation: String,
- withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+ def edgeMutate(jsValue: JsValue, operation: String, withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
val edgesWithTsv = parser.parseJsonFormat(jsValue, operation)
edgeMutate(edgesWithTsv, withWait)
}
diff --git a/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala b/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala
index eade7e6..26a7045 100644
--- a/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala
+++ b/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala
@@ -32,7 +32,7 @@
"compressionAlgorithm" -> "gz"
)
- val serviceEntity = Marshal(serviceParam.toString).to[MessageEntity].futureValue
+ val serviceEntity = Marshal(serviceParam).to[MessageEntity].futureValue
val request = Post("/createService").withEntity(serviceEntity)
request ~> routes ~> check {
@@ -71,7 +71,7 @@
)
)
- val serviceColumnEntity = Marshal(serviceColumnParam.toString).to[MessageEntity].futureValue
+ val serviceColumnEntity = Marshal(serviceColumnParam).to[MessageEntity].futureValue
val request = Post("/createServiceColumn").withEntity(serviceColumnEntity)
request ~> routes ~> check {
diff --git a/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala b/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala
index f823cd5..943db98 100644
--- a/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala
+++ b/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala
@@ -25,8 +25,11 @@
val columnName = "userName"
"MutateRoute" should {
+
"be able to insert vertex (POST /mutate/vertex/insert)" in {
- // {"timestamp": 10, "serviceName": "s2graph", "columnName": "user", "id": 1, "props": {}}
+ s2graph.management.createService(serviceName, "localhost", s"${serviceName}-dev", 1, None)
+
+ // {"timestamp": 10, "serviceName": "s2graph", "columnName": "user", "id": 1, "props": {}}
val param = Json.obj(
"timestamp" -> 10,
"serviceName" -> serviceName,
@@ -36,7 +39,8 @@
"age" -> 20
)
)
- val entity = Marshal(param.toString).to[MessageEntity].futureValue
+
+ val entity = Marshal(param).to[MessageEntity].futureValue
val request = Post("/vertex/insert").withEntity(entity)
request ~> routes ~> check {