blob: a73f5c2276d694e9a4284d3e1317f7d86ed8eeb8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.core
import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
import org.apache.s2graph.core.JSONParser.{fromJsonToProperties, toInnerVal}
import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.logger
import org.apache.tinkerpop.gremlin.structure.T
import play.api.libs.json.{JsObject, Json}
import scala.util.Try
class GraphElementBuilder(graph: S2GraphLike) {
def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
val parts = GraphUtil.split(s)
val logType = parts(2)
val element = if (logType == "edge" | logType == "e") {
/* current only edge is considered to be bulk loaded */
labelMapping.get(parts(5)) match {
case None =>
case Some(toReplace) =>
parts(5) = toReplace
}
toEdge(parts)
} else if (logType == "vertex" | logType == "v") {
toVertex(parts)
} else {
throw new GraphExceptions.JsonParseException("log type is not exist in log.")
}
element
} recover {
case e: Exception =>
logger.error(s"[toElement]: $s", e)
None
} get
def toVertex(s: String): Option[S2VertexLike] = {
toVertex(GraphUtil.split(s))
}
def toEdge(s: String): Option[S2EdgeLike] = {
toEdge(GraphUtil.split(s))
}
def toEdge(parts: Array[String]): Option[S2EdgeLike] = Try {
val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
val tempDirection = if (parts.length >= 8) parts(7) else "out"
val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
val edge = toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
Option(edge)
} recover {
case e: Exception =>
logger.error(s"[toEdge]: $e", e)
throw e
} get
def toVertex(parts: Array[String]): Option[S2VertexLike] = Try {
val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
val vertex = toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
Option(vertex)
} recover {
case e: Throwable =>
logger.error(s"[toVertex]: $e", e)
throw e
} get
def toEdge(srcId: Any,
tgtId: Any,
labelName: String,
direction: String,
props: Map[String, Any] = Map.empty,
ts: Long = System.currentTimeMillis(),
operation: String = "insert"): S2EdgeLike = {
val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
val srcColumn = if (direction == "out") label.srcColumn else label.tgtColumn
val tgtColumn = if (direction == "out") label.tgtColumn else label.srcColumn
val srcVertexIdInnerVal = toInnerVal(srcId, srcColumn.columnType, label.schemaVersion)
val tgtVertexIdInnerVal = toInnerVal(tgtId, tgtColumn.columnType, label.schemaVersion)
val srcVertex = newVertex(new SourceVertexId(srcColumn, srcVertexIdInnerVal), System.currentTimeMillis())
val tgtVertex = newVertex(new TargetVertexId(tgtColumn, tgtVertexIdInnerVal), System.currentTimeMillis())
val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
new S2Edge(graph, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs)
}
def toVertex(serviceName: String,
columnName: String,
id: Any,
props: Map[String, Any] = Map.empty,
ts: Long = System.currentTimeMillis(),
operation: String = "insert"): S2VertexLike = {
val service = Service.findByName(serviceName).getOrElse(throw new java.lang.IllegalArgumentException(s"$serviceName is not found."))
val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new java.lang.IllegalArgumentException(s"$columnName is not found."))
val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
val srcVertexId = id match {
case vid: VertexId => id.asInstanceOf[VertexId]
case _ => new VertexId(column, toInnerVal(id, column.columnType, column.schemaVersion))
}
val propsInner = column.propsToInnerVals(props) ++
Map(ColumnMeta.timestamp -> InnerVal.withLong(ts, column.schemaVersion))
val vertex = new S2Vertex(graph, srcVertexId, ts, S2Vertex.EmptyProps, op)
S2Vertex.fillPropsWithTs(vertex, propsInner)
vertex
}
/**
* helper to create new Edge instance from given parameters on memory(not actually stored in storage).
*
* Since we are using mutable map for property value(propsWithTs),
* we should make sure that reference for mutable map never be shared between multiple Edge instances.
* To guarantee this, we never create Edge directly, but rather use this helper which is available on S2Graph.
*
* Note that we are using following convention
* 1. `add*` for method that actually store instance into storage,
* 2. `new*` for method that only create instance on memory, but not store it into storage.
*
* @param srcVertex
* @param tgtVertex
* @param innerLabel
* @param dir
* @param op
* @param version
* @param propsWithTs
* @param parentEdges
* @param originalEdgeOpt
* @param pendingEdgeOpt
* @param statusCode
* @param lockTs
* @param tsInnerValOpt
* @return
*/
def newEdge(srcVertex: S2VertexLike,
tgtVertex: S2VertexLike,
innerLabel: Label,
dir: Int,
op: Byte = GraphUtil.defaultOpByte,
version: Long = System.currentTimeMillis(),
propsWithTs: S2Edge.State,
parentEdges: Seq[EdgeWithScore] = Nil,
originalEdgeOpt: Option[S2EdgeLike] = None,
pendingEdgeOpt: Option[S2EdgeLike] = None,
statusCode: Byte = 0,
lockTs: Option[Long] = None,
tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike = {
val edge = S2Edge(
graph,
srcVertex,
tgtVertex,
innerLabel,
dir,
op,
version,
S2Edge.EmptyProps,
parentEdges,
originalEdgeOpt,
pendingEdgeOpt,
statusCode,
lockTs,
tsInnerValOpt)
S2Edge.fillPropsWithTs(edge, propsWithTs)
edge
}
/**
* helper to create new SnapshotEdge instance from given parameters on memory(not actually stored in storage).
*
* Note that this is only available to S2Graph, not structure.Graph so only internal code should use this method.
* @param srcVertex
* @param tgtVertex
* @param label
* @param dir
* @param op
* @param version
* @param propsWithTs
* @param pendingEdgeOpt
* @param statusCode
* @param lockTs
* @param tsInnerValOpt
* @return
*/
private[core] def newSnapshotEdge(srcVertex: S2VertexLike,
tgtVertex: S2VertexLike,
label: Label,
dir: Int,
op: Byte,
version: Long,
propsWithTs: S2Edge.State,
pendingEdgeOpt: Option[S2EdgeLike],
statusCode: Byte = 0,
lockTs: Option[Long],
tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = {
val snapshotEdge = new SnapshotEdge(graph, srcVertex, tgtVertex, label, dir, op, version, S2Edge.EmptyProps,
pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
S2Edge.fillPropsWithTs(snapshotEdge, propsWithTs)
snapshotEdge
}
def newVertexId(serviceName: String)(columnName: String)(id: Any): VertexId = {
val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
newVertexId(service, column, id)
}
/**
* helper to create S2Graph's internal S2VertexId instance with given parameters.
* @param service
* @param column
* @param id
* @return
*/
def newVertexId(service: Service,
column: ServiceColumn,
id: Any): VertexId = {
val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(id)(column.schemaVersion)
new VertexId(column, innerVal)
}
def newVertex(id: VertexId,
ts: Long = System.currentTimeMillis(),
props: S2Vertex.Props = S2Vertex.EmptyProps,
op: Byte = 0,
belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = {
val vertex = new S2Vertex(graph, id, ts, S2Vertex.EmptyProps, op, belongLabelIds)
S2Vertex.fillPropsWithTs(vertex, props)
vertex
}
def makeVertex(idValue: AnyRef, kvsMap: Map[String, AnyRef]): S2VertexLike = {
idValue match {
case vId: VertexId =>
toVertex(vId.column.service.serviceName, vId.column.columnName, vId, kvsMap)
case _ =>
val serviceColumnNames = kvsMap.getOrElse(T.label.toString, DefaultColumnName).toString
val names = serviceColumnNames.split(S2Vertex.VertexLabelDelimiter)
val (serviceName, columnName) =
if (names.length == 1) (DefaultServiceName, names(0))
else throw new RuntimeException("malformed data on vertex label.")
toVertex(serviceName, columnName, idValue, kvsMap)
}
}
def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2EdgeLike = {
val srcVertex = queryRequest.vertex
val queryParam = queryRequest.queryParam
val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt
val label = queryParam.label
val labelWithDir = queryParam.labelWithDir
val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
val propsWithTs = label.EmptyPropsWithTs
tgtVertexIdOpt match {
case Some(tgtVertexId) => // _to is given.
/* we use toSnapshotEdge so dont need to swap src, tgt */
val src = srcVertex.innerId
val tgt = tgtVertexId
val (srcVId, tgtVId) = (new SourceVertexId(srcColumn, src), new TargetVertexId(tgtColumn, tgt))
val (srcV, tgtV) = (newVertex(srcVId), newVertex(tgtVId))
newEdge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs)
case None =>
val src = srcVertex.innerId
val srcVId = new SourceVertexId(srcColumn, src)
val srcV = newVertex(srcVId)
newEdge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, parentEdges = parentEdges)
}
}
def buildEdgesToDelete(edgeWithScoreLs: Seq[EdgeWithScore], requestTs: Long): Seq[EdgeWithScore] = {
if (edgeWithScoreLs.isEmpty) Nil
else {
val head = edgeWithScoreLs.head
val label = head.edge.innerLabel
//Degree edge?
edgeWithScoreLs.map { case edgeWithScore =>
val edge = edgeWithScore.edge
val copiedEdge = label.consistencyLevel match {
case "strong" =>
edge
.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
.copyTs(requestTs)
.copyOp(GraphUtil.operations("delete"))
.copyVersion(requestTs)
case _ =>
edge
.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
.copyTs(requestTs)
}
val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
// logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
edgeToDelete
}
}
}
}