blob: 047841344ff6ff1063d7c4b36f5bcb22e225b279 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.s2graph.core
import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
import org.apache.s2graph.core.JSONParser.{fromJsonToProperties, toInnerVal}
import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.logger
import org.apache.tinkerpop.gremlin.structure.T
import play.api.libs.json.{JsObject, Json}
import scala.util.Try
class GraphElementBuilder(graph: S2GraphLike) {
def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
val parts = GraphUtil.split(s)
val logType = parts(2)
val element = if (logType == "edge" | logType == "e") {
/* current only edge is considered to be bulk loaded */
labelMapping.get(parts(5)) match {
case None =>
case Some(toReplace) =>
parts(5) = toReplace
} else if (logType == "vertex" | logType == "v") {
} else {
throw new GraphExceptions.JsonParseException("log type is not exist in log.")
} recover {
case e: Exception =>
logger.error(s"[toElement]: $s", e)
} get
def toVertex(s: String): Option[S2VertexLike] = {
def toEdge(s: String): Option[S2EdgeLike] = {
def toEdge(parts: Array[String]): Option[S2EdgeLike] = Try {
val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
val tempDirection = if (parts.length >= 8) parts(7) else "out"
val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
val edge = toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
} recover {
case e: Exception =>
logger.error(s"[toEdge]: $e", e)
throw e
} get
def toVertex(parts: Array[String]): Option[S2VertexLike] = Try {
val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
val vertex = toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
} recover {
case e: Throwable =>
logger.error(s"[toVertex]: $e", e)
throw e
} get
def toEdge(srcId: Any,
tgtId: Any,
labelName: String,
direction: String,
props: Map[String, Any] = Map.empty,
ts: Long = System.currentTimeMillis(),
operation: String = "insert"): S2EdgeLike = {
val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
val srcColumn = if (direction == "out") label.srcColumn else label.tgtColumn
val tgtColumn = if (direction == "out") label.tgtColumn else label.srcColumn
val srcVertexIdInnerVal = toInnerVal(srcId, srcColumn.columnType, label.schemaVersion)
val tgtVertexIdInnerVal = toInnerVal(tgtId, tgtColumn.columnType, label.schemaVersion)
val srcVertex = newVertex(new SourceVertexId(srcColumn, srcVertexIdInnerVal), System.currentTimeMillis())
val tgtVertex = newVertex(new TargetVertexId(tgtColumn, tgtVertexIdInnerVal), System.currentTimeMillis())
val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
val propsPlusTs = props ++ Map( -> ts)
val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
new S2Edge(graph, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs)
def toVertex(serviceName: String,
columnName: String,
id: Any,
props: Map[String, Any] = Map.empty,
ts: Long = System.currentTimeMillis(),
operation: String = "insert"): S2VertexLike = {
val service = Service.findByName(serviceName).getOrElse(throw new java.lang.IllegalArgumentException(s"$serviceName is not found."))
val column = ServiceColumn.find(, columnName).getOrElse(throw new java.lang.IllegalArgumentException(s"$columnName is not found."))
val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
val srcVertexId = id match {
case vid: VertexId => id.asInstanceOf[VertexId]
case _ => new VertexId(column, toInnerVal(id, column.columnType, column.schemaVersion))
val propsInner = column.propsToInnerVals(props) ++
Map(ColumnMeta.timestamp -> InnerVal.withLong(ts, column.schemaVersion))
val vertex = new S2Vertex(graph, srcVertexId, ts, S2Vertex.EmptyProps, op)
S2Vertex.fillPropsWithTs(vertex, propsInner)
* helper to create new Edge instance from given parameters on memory(not actually stored in storage).
* Since we are using mutable map for property value(propsWithTs),
* we should make sure that reference for mutable map never be shared between multiple Edge instances.
* To guarantee this, we never create Edge directly, but rather use this helper which is available on S2Graph.
* Note that we are using following convention
* 1. `add*` for method that actually store instance into storage,
* 2. `new*` for method that only create instance on memory, but not store it into storage.
* @param srcVertex
* @param tgtVertex
* @param innerLabel
* @param dir
* @param op
* @param version
* @param propsWithTs
* @param parentEdges
* @param originalEdgeOpt
* @param pendingEdgeOpt
* @param statusCode
* @param lockTs
* @param tsInnerValOpt
* @return
def newEdge(srcVertex: S2VertexLike,
tgtVertex: S2VertexLike,
innerLabel: Label,
dir: Int,
op: Byte = GraphUtil.defaultOpByte,
version: Long = System.currentTimeMillis(),
propsWithTs: S2Edge.State,
parentEdges: Seq[EdgeWithScore] = Nil,
originalEdgeOpt: Option[S2EdgeLike] = None,
pendingEdgeOpt: Option[S2EdgeLike] = None,
statusCode: Byte = 0,
lockTs: Option[Long] = None,
tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike = {
val edge = S2Edge(
S2Edge.fillPropsWithTs(edge, propsWithTs)
* helper to create new SnapshotEdge instance from given parameters on memory(not actually stored in storage).
* Note that this is only available to S2Graph, not structure.Graph so only internal code should use this method.
* @param srcVertex
* @param tgtVertex
* @param label
* @param dir
* @param op
* @param version
* @param propsWithTs
* @param pendingEdgeOpt
* @param statusCode
* @param lockTs
* @param tsInnerValOpt
* @return
private[core] def newSnapshotEdge(srcVertex: S2VertexLike,
tgtVertex: S2VertexLike,
label: Label,
dir: Int,
op: Byte,
version: Long,
propsWithTs: S2Edge.State,
pendingEdgeOpt: Option[S2EdgeLike],
statusCode: Byte = 0,
lockTs: Option[Long],
tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = {
val snapshotEdge = new SnapshotEdge(graph, srcVertex, tgtVertex, label, dir, op, version, S2Edge.EmptyProps,
pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
S2Edge.fillPropsWithTs(snapshotEdge, propsWithTs)
def newVertexId(serviceName: String)(columnName: String)(id: Any): VertexId = {
val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
val column = ServiceColumn.find(, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
newVertexId(service, column, id)
* helper to create S2Graph's internal S2VertexId instance with given parameters.
* @param service
* @param column
* @param id
* @return
def newVertexId(service: Service,
column: ServiceColumn,
id: Any): VertexId = {
val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(id)(column.schemaVersion)
new VertexId(column, innerVal)
def newVertex(id: VertexId,
ts: Long = System.currentTimeMillis(),
props: S2Vertex.Props = S2Vertex.EmptyProps,
op: Byte = 0,
belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = {
val vertex = new S2Vertex(graph, id, ts, S2Vertex.EmptyProps, op, belongLabelIds)
S2Vertex.fillPropsWithTs(vertex, props)
def makeVertex(idValue: AnyRef, kvsMap: Map[String, AnyRef]): S2VertexLike = {
idValue match {
case vId: VertexId =>
toVertex(vId.column.service.serviceName, vId.column.columnName, vId, kvsMap)
case _ =>
val serviceColumnNames = kvsMap.getOrElse(T.label.toString, DefaultColumnName).toString
val names = serviceColumnNames.split(S2Vertex.VertexLabelDelimiter)
val (serviceName, columnName) =
if (names.length == 1) (DefaultServiceName, names(0))
else throw new RuntimeException("malformed data on vertex label.")
toVertex(serviceName, columnName, idValue, kvsMap)
def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2EdgeLike = {
val srcVertex = queryRequest.vertex
val queryParam = queryRequest.queryParam
val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt
val label = queryParam.label
val labelWithDir = queryParam.labelWithDir
val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
val propsWithTs = label.EmptyPropsWithTs
tgtVertexIdOpt match {
case Some(tgtVertexId) => // _to is given.
/* we use toSnapshotEdge so dont need to swap src, tgt */
val src = srcVertex.innerId
val tgt = tgtVertexId
val (srcVId, tgtVId) = (new SourceVertexId(srcColumn, src), new TargetVertexId(tgtColumn, tgt))
val (srcV, tgtV) = (newVertex(srcVId), newVertex(tgtVId))
newEdge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs)
case None =>
val src = srcVertex.innerId
val srcVId = new SourceVertexId(srcColumn, src)
val srcV = newVertex(srcVId)
newEdge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, parentEdges = parentEdges)
def buildEdgesToDelete(edgeWithScoreLs: Seq[EdgeWithScore], requestTs: Long): Seq[EdgeWithScore] = {
if (edgeWithScoreLs.isEmpty) Nil
else {
val head = edgeWithScoreLs.head
val label = head.edge.innerLabel
//Degree edge? { case edgeWithScore =>
val edge = edgeWithScore.edge
val copiedEdge = label.consistencyLevel match {
case "strong" =>
case _ =>
val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
// logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")