- Refactor WhereParser to accept GraphElement.
- Refactor S2Graph.getVertices to accept VertexQueryParam.
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 21aca12..cb1434f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -294,14 +294,19 @@
}
object VertexQueryParam {
- def Empty: VertexQueryParam = VertexQueryParam(0, 1, None)
+ def Empty: VertexQueryParam = VertexQueryParam(Nil, 0, 1, None)
+
+ def apply(vertexIds: Seq[VertexId]): VertexQueryParam = {
+ VertexQueryParam(vertexIds)
+ }
}
-case class VertexQueryParam(offset: Int,
- limit: Int,
- searchString: Option[String],
- vertexIds: Seq[VertexId] = Nil,
- fetchProp: Boolean = true) {
+case class VertexQueryParam(vertexIds: Seq[VertexId],
+ offset: Int = 0,
+ limit: Int = 1,
+ searchString: Option[String] = None,
+ fetchProp: Boolean = true,
+ where: Try[Where] = Success(WhereParser.success)) {
}
case class QueryParam(labelName: String,
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index d41bc24..f3010f7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -335,18 +335,19 @@
}
def searchVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = {
- val matchedVertices = indexProvider.fetchVertexIdsAsyncRaw(queryParam).map { vids =>
+ val matchedVerticesFuture = indexProvider.fetchVertexIdsAsyncRaw(queryParam).map { vids =>
(queryParam.vertexIds ++ vids).distinct.map(vid => elementBuilder.newVertex(vid))
}
- if (queryParam.fetchProp) matchedVertices.flatMap(vs => getVertices(vs))
- else matchedVertices
+ if (queryParam.fetchProp) matchedVerticesFuture.flatMap(vs => getVertices(queryParam.copy(vertexIds = vs.map(_.id))))
+ else matchedVerticesFuture
}
- override def getVertices(vertices: Seq[S2VertexLike]): Future[Seq[S2VertexLike]] = {
- val verticesWithIdx = vertices.zipWithIndex
- val futures = verticesWithIdx.groupBy { case (v, idx) => v.serviceColumn }.map { case (serviceColumn, vertexGroup) =>
- getVertexFetcher(serviceColumn).fetchVertices(vertices).map(_.zip(vertexGroup.map(_._2)))
+ override def getVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = {
+ val vertexIdsWithIdx = queryParam.vertexIds.zipWithIndex
+ val futures = vertexIdsWithIdx.groupBy { case (vId, idx) => vId.column }.map { case (serviceColumn, vertexGroup) =>
+ val (vertexIds, indices) = vertexGroup.unzip
+ getVertexFetcher(serviceColumn).fetchVertices(queryParam.copy(vertexIds = vertexIds)).map(_.zip(indices))
}
Future.sequence(futures).map { ls =>
@@ -562,8 +563,8 @@
}
override def getVertex(vertexId: VertexId): Option[S2VertexLike] = {
- val v = elementBuilder.newVertex(vertexId)
- Await.result(getVertices(Seq(v)).map { vertices => vertices.headOption }, WaitTimeout)
+ val queryParam = VertexQueryParam(vertexIds = Seq(vertexId))
+ Await.result(getVertices(queryParam).map { vertices => vertices.headOption }, WaitTimeout)
}
override def fetchEdges(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): util.Iterator[Edge] = {
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
index fb229ec..cab866d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -110,10 +110,7 @@
def shutdown(modelDataDelete: Boolean = false): Unit
- def getVertices(vertices: Seq[S2VertexLike]): Future[Seq[S2VertexLike]]
-
- def getVerticesJava(vertices: util.List[S2VertexLike]): CompletableFuture[util.List[S2VertexLike]] =
- getVertices(vertices.toSeq).map(_.asJava).toJava.toCompletableFuture
+ def getVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]]
def checkEdges(edges: Seq[S2EdgeLike]): Future[StepResult]
@@ -217,22 +214,23 @@
Await.result(future, WaitTimeout).flatten.iterator
} else {
- val vertices = ids.collect {
- case s2Vertex: S2VertexLike => s2Vertex
- case vId: VertexId => elementBuilder.newVertex(vId)
- case vertex: Vertex => elementBuilder.newVertex(vertex.id().asInstanceOf[VertexId])
- case other@_ => elementBuilder.newVertex(VertexId.fromString(other.toString))
+ val vertexIds = ids.collect {
+ case s2Vertex: S2VertexLike => s2Vertex.id
+ case vId: VertexId => vId
+ case vertex: Vertex => vertex.id().asInstanceOf[VertexId]
+ case other@_ => VertexId.fromString(other.toString)
}
if (fetchVertices) {
- val future = getVertices(vertices).map { vs =>
+ val queryParam = VertexQueryParam(vertexIds = vertexIds)
+ val future = getVertices(queryParam).map { vs =>
val ls = new util.ArrayList[structure.Vertex]()
ls.addAll(vs)
ls.iterator()
}
Await.result(future, WaitTimeout)
} else {
- vertices.iterator
+ vertexIds.map(vId => elementBuilder.newVertex(vId)).iterator
}
}
}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
index 4addcab..d6c2e64 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
@@ -27,7 +27,7 @@
trait VertexFetcher extends AutoCloseable {
def init(config: Config)(implicit ec: ExecutionContext): Unit = {}
- def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
+ def fetchVertices(vertexQueryParam: VertexQueryParam)(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
index c768d81..dfdeca4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
@@ -212,18 +212,19 @@
private def getVertices(jsValue: JsValue) = {
val jsonQuery = jsValue
- val vertices = jsonQuery.as[List[JsValue]].flatMap { js =>
+ val vertexIds = jsonQuery.as[List[JsValue]].flatMap { js =>
val serviceName = (js \ "serviceName").as[String]
val columnName = (js \ "columnName").as[String]
for {
idJson <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])
id <- jsValueToAny(idJson)
} yield {
- graph.toVertex(serviceName, columnName, id)
+ graph.elementBuilder.newVertexId(serviceName)(columnName)(id)
}
}
+ val queryParam = VertexQueryParam(vertexIds = vertexIds)
- graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) }
+ graph.getVertices(queryParam) map { vertices => PostProcess.verticesToJson(vertices) }
}
private def buildRequestBody(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): String = {
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 89303e6..f670e9c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -202,7 +202,7 @@
}
}
- def buildRequest(serDe: StorageSerDe, queryRequest: QueryRequest, vertex: S2VertexLike) = {
+ def buildRequest(serDe: StorageSerDe, vertex: S2VertexLike) = {
val kvs = serDe.vertexSerializer(vertex).toKeyValues
val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf)
// get.setTimeout(this.singleGetTimeout.toShort)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
index f16c8e9..4815bf3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
@@ -21,6 +21,7 @@
import com.typesafe.config.Config
import org.apache.s2graph.core._
+import org.apache.s2graph.core.parsers.Where
import org.apache.s2graph.core.schema.ServiceColumn
import org.apache.s2graph.core.storage.serde.Serializable
import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
@@ -40,23 +41,23 @@
import scala.collection.JavaConverters._
- private def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
- val rpc = buildRequest(serDe, queryRequest, vertex)
+ private def fetchKeyValues(vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
+ val rpc = buildRequest(serDe, vertex)
AsynchbaseStorage.fetchKeyValues(client, rpc)
}
- override def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
- def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = {
+ override def fetchVertices(vertexQueryParam: VertexQueryParam)(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
+ def fromResult(kvs: Seq[SKeyValue],
+ version: String): Seq[S2VertexLike] = {
if (kvs.isEmpty) Nil
- else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
+ else {
+ serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq.filter(vertexQueryParam.where.get.filter)
+ }
}
+ val vertices = vertexQueryParam.vertexIds.map(vId => graph.elementBuilder.newVertex(vId))
val futures = vertices.map { vertex =>
- val queryParam = QueryParam.Empty
- val q = Query.toQuery(Seq(vertex), Seq(queryParam))
- val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
-
- fetchKeyValues(queryRequest, vertex).map { kvs =>
+ fetchKeyValues(vertex).map { kvs =>
fromResult(kvs, vertex.serviceColumn.schemaVersion)
} recoverWith {
case ex: Throwable => Future.successful(Nil)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala
index 2d3880c..94a0da6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala
@@ -43,11 +43,12 @@
RocksStorage.fetchKeyValues(vdb, db, rpc)
}
- override def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
+ override def fetchVertices(vertexQueryParam: VertexQueryParam)(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = {
if (kvs.isEmpty) Nil
- else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
+ else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq.filter(vertexQueryParam.where.get.filter)
}
+ val vertices = vertexQueryParam.vertexIds.map(vId => graph.elementBuilder.newVertex(vId))
val futures = vertices.map { vertex =>
val queryParam = QueryParam.Empty
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
index 478517f..6e1a7cc 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
@@ -87,7 +87,7 @@
val selectedFields = AstHelper.selectedFields(c.astFields)
val canSkipFetch = selectedFields.forall(f => f == "id" || !columnFields(f))
- val vertexQueryParam = VertexQueryParam(offset, limit, searchOpt, vertices.map(_.id), !canSkipFetch)
+ val vertexQueryParam = VertexQueryParam(vertices.map(_.id), offset, limit, searchOpt, !canSkipFetch)
vertexQueryParam
}
@@ -102,7 +102,7 @@
val columnFields = column.metasInvMap.keySet
val canSkipFetch = selectedFields.forall(f => f == "id" || !columnFields(f))
- val vertexQueryParam = VertexQueryParam(0, 1, None, Seq(vertex.id), !canSkipFetch)
+ val vertexQueryParam = VertexQueryParam(Seq(vertex.id), 0, 1, None, !canSkipFetch)
vertexQueryParam
}