blob: a65db5a8298dbdae9ecb4a6e41923a98c3a77b22 [file] [log] [blame]
package org.apache.s2graph.http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpResponse, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{ExceptionHandler, Route}
import com.fasterxml.jackson.core.JsonParseException
import org.apache.s2graph.core.rest.RequestParser
import org.apache.s2graph.core.storage.MutateResponse
import org.apache.s2graph.core.{GraphElement, S2Graph}
import org.slf4j.LoggerFactory
import play.api.libs.json.{JsValue, Json}
import scala.concurrent.{ExecutionContext, Future}
trait S2GraphMutateRoute extends PlayJsonSupport {
val s2graph: S2Graph
val logger = LoggerFactory.getLogger(this.getClass)
lazy val parser = new RequestParser(s2graph)
// lazy val requestParser = new RequestParser(s2graph)
lazy val exceptionHandler = ExceptionHandler {
case ex: JsonParseException => complete(StatusCodes.BadRequest -> ex.getMessage)
case ex: java.lang.IllegalArgumentException => complete(StatusCodes.BadRequest -> ex.getMessage)
}
lazy val mutateVertex = path("vertex" / Segments) { params =>
implicit val ec = s2graph.ec
val (operation, serviceNameOpt, columnNameOpt) = params match {
case operation :: serviceName :: columnName :: Nil => (operation, Option(serviceName), Option(columnName))
case operation :: Nil => (operation, None, None)
case _ => throw new RuntimeException("invalid params")
}
entity(as[JsValue]) { payload =>
val future = vertexMutate(payload, operation, serviceNameOpt, columnNameOpt).map(Json.toJson(_))
complete(future)
}
}
lazy val mutateEdge = path("edge" / Segment) { operation =>
implicit val ec = s2graph.ec
entity(as[JsValue]) { payload =>
val future = edgeMutate(payload, operation, withWait = true).map(Json.toJson(_))
complete(future)
}
}
def vertexMutate(jsValue: JsValue,
operation: String,
serviceNameOpt: Option[String] = None,
columnNameOpt: Option[String] = None,
withWait: Boolean = true)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
val vertices = parser.toVertices(jsValue, operation, serviceNameOpt, columnNameOpt)
val verticesToStore = vertices.filterNot(_.isAsync)
s2graph.mutateVertices(verticesToStore, withWait).map(_.map(_.isSuccess))
}
def edgeMutate(elementsWithTsv: Seq[(GraphElement, String)], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
val elementWithIdxs = elementsWithTsv.zipWithIndex
val (elementSync, elementAsync) = elementWithIdxs.partition { case ((element, tsv), idx) => !element.isAsync }
val retToSkip = elementAsync.map(_._2 -> MutateResponse.Success)
val (elementsToStore, _) = elementSync.map(_._1).unzip
val elementsIdxToStore = elementSync.map(_._2)
s2graph.mutateElements(elementsToStore, withWait).map { mutateResponses =>
elementsIdxToStore.zip(mutateResponses) ++ retToSkip
}.map(_.sortBy(_._1).map(_._2.isSuccess))
}
def edgeMutate(jsValue: JsValue, operation: String, withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
val edgesWithTsv = parser.parseJsonFormat(jsValue, operation)
edgeMutate(edgesWithTsv, withWait)
}
// expose routes
lazy val mutateRoute: Route =
post {
concat(
handleExceptions(exceptionHandler) {
mutateVertex
},
handleExceptions(exceptionHandler) {
mutateEdge
}
)
}
}