blob: f7a54a9d5c1d0627a9e8401771f3ed72da24f945 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.core
import java.util.Base64
import com.google.protobuf.ByteString
import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException}
import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}
import org.apache.s2graph.core.rest.RequestParser
import org.apache.s2graph.core.utils.logger
import play.api.libs.json.{Json, _}
import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.collection.immutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.concurrent.Future
import scala.util.Random
object PostProcess {
type EDGE_VALUES = Map[String, JsValue]
type ORDER_BY_VALUES = (Any, Any, Any, Any)
type RAW_EDGE = (EDGE_VALUES, Double, ORDER_BY_VALUES)
type GROUP_BY_KEY = Map[String, JsValue]
/**
* Result Entity score field name
*/
val emptyDegrees = Seq.empty[JsValue]
val emptyResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr(), "isEmpty" -> true, "rpcFail" -> 0)
def badRequestResults(ex: => Exception) = ex match {
case ex: BadQueryException => Json.obj("message" -> ex.msg)
case _ => Json.obj("message" -> ex.getMessage)
}
def s2EdgeParent(graph: S2GraphLike,
queryOption: QueryOption,
parentEdges: Seq[EdgeWithScore]): JsValue = {
if (parentEdges.isEmpty) JsNull
else {
val ancestors = for {
current <- parentEdges
parents = s2EdgeParent(graph, queryOption, current.edge.getParentEdges()) if parents != JsNull
} yield {
val s2Edge = current.edge.getOriginalEdgeOpt().getOrElse(current.edge)
s2EdgeToJsValue(queryOption, current.copy(edge = s2Edge), false, parents = parents, checkSelectColumns = true)
}
Json.toJson(ancestors)
}
}
def s2EdgeToJsValue(queryOption: QueryOption,
edgeWithScore: EdgeWithScore,
isDegree: Boolean = false,
parents: JsValue = JsNull,
checkSelectColumns: Boolean = false): JsValue = {
// val builder = immutable.Map.newBuilder[String, JsValue]
val builder = ArrayBuffer.empty[(String, JsValue)]
val s2Edge = edgeWithScore.edge
val score = edgeWithScore.score
val label = edgeWithScore.label
if (isDegree) {
builder += ("from" -> anyValToJsValue(s2Edge.srcVertex.innerIdVal).get)
builder += ("label" -> anyValToJsValue(label.label).get)
builder += ("direction" -> anyValToJsValue(s2Edge.getDirection()).get)
builder += (LabelMeta.degree.name -> anyValToJsValue(s2Edge.propertyValueInner(LabelMeta.degree).innerVal.value).get)
JsObject(builder)
} else {
if (queryOption.withScore) builder += ("score" -> anyValToJsValue(score).get)
if (queryOption.selectColumns.isEmpty) {
builder += ("from" -> anyValToJsValue(s2Edge.srcVertex.innerIdVal).get)
builder += ("to" -> anyValToJsValue(s2Edge.tgtVertex.innerIdVal).get)
builder += ("label" -> anyValToJsValue(label.label).get)
val innerProps = ArrayBuffer.empty[(String, JsValue)]
for {
(labelMeta, v) <- edgeWithScore.edge.propertyValues()
jsValue <- anyValToJsValue(v.innerVal.value)
} {
innerProps += (labelMeta.name -> jsValue)
}
builder += ("props" -> JsObject(innerProps))
builder += ("direction" -> anyValToJsValue(s2Edge.getDirection()).get)
builder += ("timestamp" -> anyValToJsValue(s2Edge.getTsInnerValValue()).get)
builder += ("_timestamp" -> anyValToJsValue(s2Edge.getTsInnerValValue()).get) // backward compatibility
if (parents != JsNull) builder += ("parents" -> parents)
// Json.toJson(builder.result())
JsObject(builder)
} else {
queryOption.selectColumnsMap.foreach { case (columnName, _) =>
columnName match {
case "from" => builder += ("from" -> anyValToJsValue(s2Edge.srcVertex.innerIdVal).get)
case "_from" => builder += ("_from" -> anyValToJsValue(s2Edge.srcVertex.innerIdVal).get)
case "to" => builder += ("to" -> anyValToJsValue(s2Edge.tgtVertex.innerIdVal).get)
case "_to" => builder += ("_to" -> anyValToJsValue(s2Edge.tgtVertex.innerIdVal).get)
case "label" => builder += ("label" -> anyValToJsValue(label.label).get)
case "direction" => builder += ("direction" -> anyValToJsValue(s2Edge.getDirection()).get)
case "timestamp" => builder += ("timestamp" -> anyValToJsValue(s2Edge.getTsInnerValValue()).get)
case "_timestamp" => builder += ("_timestamp" -> anyValToJsValue(s2Edge.getTsInnerValValue()).get)
case _ => // should not happen
}
}
val innerProps = ArrayBuffer.empty[(String, JsValue)]
for {
(selectColumnName, _) <- queryOption.selectColumnsMap
labelMeta <- label.metaPropsInvMap.get(selectColumnName)
innerValWithTs = edgeWithScore.edge.propertyValueInner(labelMeta)
jsValue <- anyValToJsValue(innerValWithTs.innerVal.value)
} {
innerProps += (labelMeta.name -> jsValue)
}
builder += ("props" -> JsObject(innerProps))
if (parents != JsNull) builder += ("parents" -> parents)
JsObject(builder)
}
}
}
def s2VertexToJson(s2Vertex: S2VertexLike): Option[JsValue] = {
val props = for {
(_, property) <- s2Vertex.props
jsVal <- anyValToJsValue(property.value)
} yield property.columnMeta.name -> jsVal
for {
id <- anyValToJsValue(s2Vertex.innerIdVal)
} yield {
Json.obj(
"serviceName" -> s2Vertex.serviceName,
"columnName" -> s2Vertex.columnName,
"id" -> id,
"props" -> Json.toJson(props),
"timestamp" -> s2Vertex.ts
)
}
}
def verticesToJson(s2Vertices: Seq[S2VertexLike]): JsValue =
Json.toJson(s2Vertices.flatMap(s2VertexToJson(_)))
def withOptionalFields(queryOption: QueryOption,
size: Int,
degrees: Seq[JsValue],
results: Seq[JsValue],
failCount: Int = 0,
cursors: => JsValue,
nextQuery: => Option[JsValue]): JsValue = {
val kvs = new ArrayBuffer[(String, JsValue)]()
kvs.append("size" -> JsNumber(size))
kvs.append("degrees" -> JsArray(degrees))
kvs.append("results" -> JsArray(results))
if (queryOption.impIdOpt.isDefined) kvs.append(Experiment.ImpressionKey -> JsString(queryOption.impIdOpt.get))
JsObject(kvs)
}
def buildJsonWith(js: JsValue)(implicit fn: (String, JsValue) => JsValue): JsValue = js match {
case JsObject(obj) => JsObject(obj.map { case (k, v) => k -> buildJsonWith(fn(k, v)) })
case JsArray(arr) => JsArray(arr.map(buildJsonWith(_)))
case _ => js
}
def toJson(orgQuery: Option[JsValue])(graph: S2GraphLike,
queryOption: QueryOption,
stepResult: StepResult): JsValue = {
// [[cursor, cursor], [cursor]]
lazy val cursors: Seq[Seq[String]] = stepResult.accumulatedCursors.map { stepCursors =>
stepCursors.map { cursor => new String(Base64.getEncoder.encode(cursor)) }
}
lazy val cursorJson: JsValue = Json.toJson(cursors)
// build nextQuery with (original query + cursors)
lazy val nextQuery: Option[JsValue] = {
if (cursors.exists { stepCursors => stepCursors.exists(_ != "") }) {
val cursorIter = cursors.iterator
orgQuery.map { query =>
buildJsonWith(query) { (key, js) =>
if (key == "step") {
val currentCursor = cursorIter.next
val res = js.as[Seq[JsObject]].toStream.zip(currentCursor).filterNot(_._2 == "").map { case (obj, cursor) =>
val label = (obj \ "label").as[String]
if (Label.findByName(label).get.schemaVersion == "v4") obj + ("cursor" -> JsString(cursor))
else {
val limit = (obj \ "limit").asOpt[Int].getOrElse(RequestParser.defaultLimit)
val offset = (obj \ "offset").asOpt[Int].getOrElse(0)
obj + ("offset" -> JsNumber(offset + limit))
}
}
JsArray(res)
} else js
}
}
} else Option(JsNull)
}
val limitOpt = queryOption.limitOpt
val selectColumns = queryOption.selectColumnsMap
val degrees =
if (queryOption.returnDegree) stepResult.degreeEdges.map(t => s2EdgeToJsValue(queryOption, t, true, JsNull))
else emptyDegrees
if (queryOption.groupBy.keys.isEmpty) {
// no group by specified on query.
val results = if (limitOpt.isDefined) stepResult.edgeWithScores.take(limitOpt.get) else stepResult.edgeWithScores
val ls = results.map { t =>
val parents = if (queryOption.returnTree) s2EdgeParent(graph, queryOption, t.edge.getParentEdges()) else JsNull
s2EdgeToJsValue(queryOption, t, false, parents)
}
withOptionalFields(queryOption, ls.size, degrees, ls, stepResult.failCount, cursorJson, nextQuery)
} else {
val grouped = if (limitOpt.isDefined) stepResult.grouped.take(limitOpt.get) else stepResult.grouped
val results =
for {
(groupByValues, (scoreSum, edges)) <- grouped
} yield {
val groupByKeyValues = queryOption.groupBy.keys.zip(groupByValues).map { case (k, valueOpt) =>
k -> valueOpt.flatMap(anyValToJsValue).getOrElse(JsNull)
}
val groupByValuesJson = Json.toJson(groupByKeyValues.toMap)
if (!queryOption.returnAgg) {
Json.obj(
"groupBy" -> groupByValuesJson,
"scoreSum" -> scoreSum,
"agg" -> Json.arr()
)
} else {
val agg = edges.map { t =>
val parents = if (queryOption.returnTree) s2EdgeParent(graph, queryOption, t.edge.getParentEdges()) else JsNull
s2EdgeToJsValue(queryOption, t, false, parents)
}
val aggJson = Json.toJson(agg)
Json.obj(
"groupBy" -> groupByValuesJson,
"scoreSum" -> scoreSum,
"agg" -> aggJson
)
}
}
withOptionalFields(queryOption, results.size, degrees, results, stepResult.failCount, cursorJson, nextQuery)
}
}
def s2EdgePropsJsonString(edge: S2EdgeLike): String =
Json.toJson(s2EdgePropsJson(edge)).toString()
def s2VertexPropsJsonString(vertex: S2VertexLike): String =
Json.toJson(s2VertexPropsJson(vertex)).toString()
def s2EdgePropsJson(edge: S2EdgeLike): Map[String, JsValue] = {
import scala.collection.JavaConverters._
for {
(k, v) <- edge.getPropsWithTs().asScala.toMap
jsValue <- JSONParser.anyValToJsValue(v.innerVal.value)
} yield (v.labelMeta.name -> jsValue)
}
def s2VertexPropsJson(vertex: S2VertexLike): Map[String, JsValue] = {
import scala.collection.JavaConverters._
for {
(k, v) <- vertex.props.asScala.toMap
jsValue <- JSONParser.anyValToJsValue(v.innerVal.value)
} yield (v.columnMeta.name -> jsValue)
}
}