blob: bd98504cf1ce12f06f35e0a19f8e1435c02c6a12 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.graphql.repository
import org.apache.s2graph.core.Management.JsonModel._
import org.apache.s2graph.core._
import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.rest.RequestParser
import org.apache.s2graph.core.storage.MutateResponse
import org.apache.s2graph.core.types._
import org.apache.s2graph.graphql.types.S2Type._
import org.slf4j.{Logger, LoggerFactory}
import sangria.execution.deferred._
import sangria.schema._
import scala.collection.immutable
import scala.concurrent._
import scala.util.{Failure, Success, Try}
object GraphRepository {
implicit val vertexHasId = new HasId[(VertexQueryParam, Seq[S2VertexLike]), VertexQueryParam] {
override def id(value: (VertexQueryParam, Seq[S2VertexLike])): VertexQueryParam = value._1
}
implicit val edgeHasId = new HasId[(EdgeQueryParam, Seq[S2EdgeLike]), EdgeQueryParam] {
override def id(value: (EdgeQueryParam, Seq[S2EdgeLike])): EdgeQueryParam = value._1
}
val vertexFetcher =
Fetcher((ctx: GraphRepository, queryParams: Seq[VertexQueryParam]) => {
implicit val ec = ctx.ec
Future.traverse(queryParams)(ctx.getVertices).map(vs => queryParams.zip(vs))
})
val edgeFetcher = Fetcher((ctx: GraphRepository, edgeQueryParams: Seq[EdgeQueryParam]) => {
implicit val ec = ctx.ec
val edgesByParam = edgeQueryParams.groupBy(_.qp).toSeq.map { case (qp, edgeQueryParams) =>
val vertices = edgeQueryParams.map(_.v)
ctx.getEdges(vertices, qp).map(edges => qp -> edges)
}
val f: Future[Seq[(QueryParam, Seq[S2EdgeLike])]] = Future.sequence(edgesByParam)
val grouped: Future[Seq[(EdgeQueryParam, Seq[S2EdgeLike])]] = f.map { tpLs =>
tpLs.flatMap { case (qp, edges) =>
edges.groupBy(_.srcForVertex).map {
case (v, edges) => EdgeQueryParam(v, qp) -> edges
}
}
}
grouped
})
def withLogTryResponse[A](opName: String, tryObj: Try[A])(implicit logger: Logger): Try[A] = {
tryObj match {
case Success(v) => logger.info(s"${opName} Success:", v)
case Failure(e) => logger.warn(s"${opName} Failed:", e)
}
tryObj
}
}
class GraphRepository(val graph: S2GraphLike) {
implicit val logger = LoggerFactory.getLogger(this.getClass)
import GraphRepository._
val management = graph.management
val parser = new RequestParser(graph)
implicit val ec = graph.ec
def toS2EdgeLike(labelName: String, param: AddEdgeParam): S2EdgeLike = {
graph.toEdge(
srcId = param.from,
tgtId = param.to,
labelName = labelName,
props = param.props,
direction = param.direction
)
}
def toS2VertexLike(vid: Any, column: ServiceColumn): S2VertexLike = {
graph.toVertex(column.service.serviceName, column.columnName, vid)
}
def toS2VertexLike(serviceName: String, param: AddVertexParam): S2VertexLike = {
graph.toVertex(
serviceName = serviceName,
columnName = param.columnName,
id = param.id,
props = param.props,
ts = param.timestamp)
}
def addVertices(vertices: Seq[S2VertexLike]): Future[Seq[MutateResponse]] = {
graph.mutateVertices(vertices, withWait = true)
}
def addEdges(edges: Seq[S2EdgeLike]): Future[Seq[MutateResponse]] = {
graph.mutateEdges(edges, withWait = true)
}
def getVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = {
graph.asInstanceOf[S2Graph].searchVertices(queryParam).map { v =>
v
}
}
def getEdges(vertices: Seq[S2VertexLike], queryParam: QueryParam): Future[Seq[S2EdgeLike]] = {
val step = Step(Seq(queryParam))
val q = Query(vertices, steps = Vector(step))
graph.getEdges(q).map(_.edgeWithScores.map(_.edge))
}
def getEdges(vertex: S2VertexLike, queryParam: QueryParam): Future[Seq[S2EdgeLike]] = {
val step = Step(Seq(queryParam))
val q = Query(Seq(vertex), steps = Vector(step))
graph.getEdges(q).map(_.edgeWithScores.map(_.edge))
}
def createService(args: Args): Try[Service] = {
val serviceName = args.arg[String]("name")
val serviceTry = Service.findByName(serviceName) match {
case Some(_) => Failure(new RuntimeException(s"Service (${serviceName}) already exists"))
case None =>
val cluster = args.argOpt[String]("cluster").getOrElse(parser.DefaultCluster)
val hTableName = args.argOpt[String]("hTableName").getOrElse(s"${serviceName}-${parser.DefaultPhase}")
val preSplitSize = args.argOpt[Int]("preSplitSize").getOrElse(1)
val hTableTTL = args.argOpt[Int]("hTableTTL")
val compressionAlgorithm = args.argOpt[String]("compressionAlgorithm").getOrElse(parser.DefaultCompressionAlgorithm)
val serviceTry = management
.createService(serviceName,
cluster,
hTableName,
preSplitSize,
hTableTTL,
compressionAlgorithm)
serviceTry
}
withLogTryResponse("createService", serviceTry)
}
def createServiceColumn(args: Args): Try[ServiceColumn] = {
val serviceName = args.arg[String]("serviceName")
val columnName = args.arg[String]("columnName")
val columnType = args.arg[String]("columnType")
val props = args.argOpt[Vector[Prop]]("props").getOrElse(Vector.empty)
val tryColumn = Try {
management.createServiceColumn(serviceName, columnName, columnType, props)
}
withLogTryResponse("createServiceColumn", tryColumn)
}
def deleteServiceColumn(args: Args): Try[ServiceColumn] = {
val serviceColumnParam = args.arg[ServiceColumnParam]("service")
val serviceName = serviceColumnParam.serviceName
val columnName = serviceColumnParam.columnName
val deleteTry = Management.deleteColumn(serviceName, columnName)
withLogTryResponse("deleteServiceColumn", deleteTry)
}
def addPropsToLabel(args: Args): Try[Label] = {
val addPropToLabelTry = Try {
val labelName = args.arg[String]("labelName")
val props = args.arg[Vector[Prop]]("props").toList
props.foreach { prop =>
Management.addProp(labelName, prop).get
}
Label.findByName(labelName, false).get
}
withLogTryResponse("addPropToLabel", addPropToLabelTry)
}
def addPropsToServiceColumn(args: Args): Try[ServiceColumn] = {
val addPropsToServiceColumnTry = Try {
val serviceColumnParam = args.arg[ServiceColumnParam]("service")
val serviceName = serviceColumnParam.serviceName
val columnName = serviceColumnParam.columnName
serviceColumnParam.props.foreach { prop =>
Management.addVertexProp(serviceName, columnName, prop.name, prop.dataType, prop.defaultValue, prop.storeInGlobalIndex)
}
val src = Service.findByName(serviceName)
ServiceColumn.find(src.get.id.get, columnName, false).get
}
withLogTryResponse("addPropsToServiceColumn", addPropsToServiceColumnTry)
}
def createLabel(args: Args): Try[Label] = {
val labelName = args.arg[String]("name")
val srcServiceProp = args.arg[ServiceColumnParam]("sourceService")
val srcServiceColumn = ServiceColumn.find(Service.findByName(srcServiceProp.serviceName).get.id.get, srcServiceProp.columnName).get
val tgtServiceProp = args.arg[ServiceColumnParam]("targetService")
val tgtServiceColumn = ServiceColumn.find(Service.findByName(tgtServiceProp.serviceName).get.id.get, tgtServiceProp.columnName).get
val allProps = args.argOpt[Vector[Prop]]("props").getOrElse(Vector.empty)
val indices = args.argOpt[Vector[Index]]("indices").getOrElse(Vector.empty)
val serviceName = args.argOpt[String]("serviceName").getOrElse(tgtServiceColumn.service.serviceName)
val consistencyLevel = args.argOpt[String]("consistencyLevel").getOrElse("weak")
val hTableName = args.argOpt[String]("hTableName")
val hTableTTL = args.argOpt[Int]("hTableTTL")
val schemaVersion = args.argOpt[String]("schemaVersion").getOrElse(HBaseType.DEFAULT_VERSION)
val isAsync = args.argOpt("isAsync").getOrElse(false)
val compressionAlgorithm = args.argOpt[String]("compressionAlgorithm").getOrElse(parser.DefaultCompressionAlgorithm)
val isDirected = args.argOpt[Boolean]("isDirected").getOrElse(true)
// val options = args.argOpt[String]("options") // TODO: support option type
val options = Option("""{"storeVertex": true}""")
val labelTry: scala.util.Try[Label] = management.createLabel(
labelName,
srcServiceProp.serviceName,
srcServiceColumn.columnName,
srcServiceColumn.columnType,
tgtServiceProp.serviceName,
tgtServiceColumn.columnName,
tgtServiceColumn.columnType,
serviceName,
indices,
allProps,
isDirected,
consistencyLevel,
hTableName,
hTableTTL,
schemaVersion,
isAsync,
compressionAlgorithm,
options
)
withLogTryResponse("createLabel", labelTry)
}
def deleteLabel(args: Args): Try[Label] = {
val labelName = args.arg[String]("name")
val deleteLabelTry = Management.deleteLabel(labelName)
withLogTryResponse("deleteLabel", deleteLabelTry)
}
def services(): List[Service] = {
Service.findAll()
}
def serviceColumns(): List[ServiceColumn] = {
val allServices = services().toSet
ServiceColumn
.findAll()
.filter(sc => allServices(sc.service))
}
def labels() = {
val allServiceColumns = serviceColumns().toSet
Label
.findAll()
.filter(l => allServiceColumns(l.srcColumn) || allServiceColumns(l.tgtColumn))
}
}