blob: fa9ff622d1b3f0802016442bf1bf04fb97a6b9db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.core
import java.util
import java.util.function.{BiConsumer, Consumer}
import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
import org.apache.s2graph.core.S2Edge.{Props, State}
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta, ServiceColumn}
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.logger
import org.apache.s2graph.core.io.Conversions._
import org.apache.tinkerpop.gremlin.structure
import org.apache.tinkerpop.gremlin.structure.util.StringFactory
import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph, Property, T, Vertex}
import play.api.libs.json.{JsNumber, JsObject, Json}
import scala.collection.JavaConverters._
import scala.collection.mutable.{Map => MutableMap}
import scala.concurrent.Await
import scala.util.hashing.MurmurHash3
object SnapshotEdge {
def copyFrom(e: SnapshotEdge): SnapshotEdge = {
val copy =
SnapshotEdge(
e.graph,
e.srcVertex,
e.tgtVertex,
e.label,
e.dir,
e.op,
e.version,
S2Edge.EmptyProps,
e.pendingEdgeOpt,
e.statusCode,
e.lockTs,
e.tsInnerValOpt)
copy.updatePropsWithTs(e.propsWithTs)
copy
}
}
case class SnapshotEdge(graph: S2Graph,
srcVertex: S2Vertex,
tgtVertex: S2Vertex,
label: Label,
dir: Int,
op: Byte,
version: Long,
private val propsWithTs: Props,
pendingEdgeOpt: Option[S2Edge],
statusCode: Byte = 0,
lockTs: Option[Long],
tsInnerValOpt: Option[InnerValLike] = None) {
lazy val direction = GraphUtil.fromDirection(dir)
lazy val operation = GraphUtil.fromOp(op)
lazy val edge = toEdge
lazy val labelWithDir = LabelWithDirection(label.id.get, dir)
// if (!propsWithTs.contains(LabelMeta.timestamp.name)) throw new Exception("Timestamp is required.")
// val label = Label.findById(labelWithDir.labelId)
lazy val schemaVer = label.schemaVersion
lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString().toLong
def propsToKeyValuesWithTs = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
def allPropsDeleted = S2Edge.allPropsDeleted(propsWithTs)
def toEdge: S2Edge = {
S2Edge(graph, srcVertex, tgtVertex, label, dir, op,
version, propsWithTs, pendingEdgeOpt = pendingEdgeOpt,
statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
}
def propsWithName = (for {
(_, v) <- propsWithTs.asScala
meta = v.labelMeta
jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
} yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version))
def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = {
if (others.isEmpty) propsWithTs
else {
val iter = others.entrySet().iterator()
while (iter.hasNext) {
val e = iter.next()
propsWithTs.put(e.getKey, e.getValue)
}
propsWithTs
}
}
// only for debug
def toLogString() = {
List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, propsWithName).mkString("\t")
}
def property[V](key: String, value: V, ts: Long): S2Property[V] = {
S2Property.assertValidProp(key, value)
val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge."))
val newProps = new S2Property(edge, labelMeta, key, value, ts)
propsWithTs.put(key, newProps)
newProps
}
override def hashCode(): Int = {
MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId)
}
override def equals(other: Any): Boolean = other match {
case e: SnapshotEdge =>
srcVertex.innerId == e.srcVertex.innerId &&
tgtVertex.innerId == e.tgtVertex.innerId &&
labelWithDir == e.labelWithDir && op == e.op && version == e.version &&
pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode == statusCode
case _ => false
}
override def toString(): String = {
Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> direction,
"operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
"statusCode" -> statusCode, "lockTs" -> lockTs).toString
}
}
object IndexEdge {
def copyFrom(e: IndexEdge): IndexEdge = {
val copy = IndexEdge(
e.graph,
e.srcVertex,
e.tgtVertex,
e.label,
e.dir,
e.op,
e.version,
e.labelIndexSeq,
S2Edge.EmptyProps,
e.tsInnerValOpt
)
copy.updatePropsWithTs(e.propsWithTs)
copy
}
}
case class IndexEdge(graph: S2Graph,
srcVertex: S2Vertex,
tgtVertex: S2Vertex,
label: Label,
dir: Int,
op: Byte,
version: Long,
labelIndexSeq: Byte,
private val propsWithTs: Props,
tsInnerValOpt: Option[InnerValLike] = None) {
// if (!props.contains(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.")
// assert(props.contains(LabelMeta.timeStampSeq))
lazy val direction = GraphUtil.fromDirection(dir)
lazy val operation = GraphUtil.fromOp(op)
lazy val edge = toEdge
lazy val labelWithDir = LabelWithDirection(label.id.get, dir)
lazy val isInEdge = labelWithDir.dir == GraphUtil.directions("in")
lazy val isOutEdge = !isInEdge
lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString.toLong
lazy val degreeEdge = propsWithTs.containsKey(LabelMeta.degree.name)
lazy val schemaVer = label.schemaVersion
lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get
lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map { meta =>
val innerVal = toInnerVal(meta.defaultValue, meta.dataType, schemaVer)
meta.seq -> innerVal
}.toMap
lazy val labelIndexMetaSeqs = labelIndex.sortKeyTypes
def indexOption = if (isInEdge) labelIndex.inDirOption else labelIndex.outDirOption
/** TODO: make sure call of this class fill props as this assumes */
lazy val orders = for (meta <- labelIndexMetaSeqs) yield {
propsWithTs.get(meta.name) match {
case null =>
/**
* TODO: agly hack
* now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once
*/
val v = meta match {
case LabelMeta.timestamp=> InnerVal.withLong(version, schemaVer)
case LabelMeta.to => toEdge.tgtVertex.innerId
case LabelMeta.from => toEdge.srcVertex.innerId
case LabelMeta.fromHash => indexOption.map { option =>
InnerVal.withLong(MurmurHash3.stringHash(toEdge.srcVertex.innerId.toString()).abs % option.totalModular, schemaVer)
}.getOrElse(throw new RuntimeException("from_hash must be used with sampling"))
// for now, it does not make sense to build index on srcVertex.innerId since all edges have same data.
case _ => toInnerVal(meta.defaultValue, meta.dataType, schemaVer)
}
meta -> v
case v => meta -> v.innerVal
}
}
lazy val ordersKeyMap = orders.map { case (meta, _) => meta.name }.toSet
lazy val metas = for ((meta, v) <- propsWithTs.asScala if !ordersKeyMap.contains(meta)) yield v.labelMeta -> v.innerVal
// lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
//TODO:
// lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList
lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length
def propsWithName = for {
(_, v) <- propsWithTs.asScala
meta = v.labelMeta
jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
} yield meta.name -> jsValue
def toEdge: S2Edge = S2Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
// only for debug
def toLogString() = {
List(version, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName)).mkString("\t")
}
def property(key: String): Option[InnerValLikeWithTs] = {
label.metaPropsInvMap.get(key).map(labelMeta => property(labelMeta))
}
def property(labelMeta: LabelMeta): InnerValLikeWithTs = {
// propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(label.metaPropsDefaultMapInner(labelMeta))
if (propsWithTs.containsKey(labelMeta.name)) {
propsWithTs.get(labelMeta.name).innerValWithTs
} else {
label.metaPropsDefaultMapInner(labelMeta)
}
}
def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = {
if (others.isEmpty) propsWithTs
else {
val iter = others.entrySet().iterator()
while (iter.hasNext) {
val e = iter.next()
propsWithTs.put(e.getKey, e.getValue)
}
propsWithTs
}
}
def property[V](key: String, value: V, ts: Long): S2Property[V] = {
val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge."))
val newProps = new S2Property(edge, labelMeta, key, value, ts)
propsWithTs.put(key, newProps)
newProps
}
override def hashCode(): Int = {
MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId + "," + labelIndexSeq)
}
override def equals(other: Any): Boolean = other match {
case e: IndexEdge =>
srcVertex.innerId == e.srcVertex.innerId &&
tgtVertex.innerId == e.tgtVertex.innerId &&
labelWithDir == e.labelWithDir && op == e.op && version == e.version &&
labelIndexSeq == e.labelIndexSeq
case _ => false
}
override def toString(): String = {
Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> dir,
"operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString
).toString
}
}
case class S2Edge(innerGraph: S2Graph,
srcVertex: S2Vertex,
var tgtVertex: S2Vertex,
innerLabel: Label,
dir: Int,
var op: Byte = GraphUtil.defaultOpByte,
var version: Long = System.currentTimeMillis(),
propsWithTs: Props = S2Edge.EmptyProps,
parentEdges: Seq[EdgeWithScore] = Nil,
originalEdgeOpt: Option[S2Edge] = None,
pendingEdgeOpt: Option[S2Edge] = None,
statusCode: Byte = 0,
lockTs: Option[Long] = None,
var tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement with Edge {
lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir)
lazy val schemaVer = innerLabel.schemaVersion
lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.value match {
case b: BigDecimal => b.longValue()
case l: Long => l
case i: Int => i.toLong
case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].")
}
lazy val operation = GraphUtil.fromOp(op)
lazy val tsInnerVal = tsInnerValOpt.get.value
lazy val srcId = srcVertex.innerIdVal
lazy val tgtId = tgtVertex.innerIdVal
lazy val labelName = innerLabel.label
lazy val direction = GraphUtil.fromDirection(dir)
def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs)
def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = {
val emptyProp = S2Edge.EmptyProps
propsWithTs.forEach(new BiConsumer[String, S2Property[_]] {
override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
})
others.forEach(new BiConsumer[String, S2Property[_]] {
override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
})
emptyProp
}
def propertyValue(key: String): Option[InnerValLikeWithTs] = {
key match {
case "from" | "_from" => Option(InnerValLikeWithTs(srcVertex.innerId, ts))
case "to" | "_to" => Option(InnerValLikeWithTs(tgtVertex.innerId, ts))
case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(innerLabel.label, schemaVer), ts))
case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(direction, schemaVer), ts))
case _ =>
innerLabel.metaPropsInvMap.get(key).map(labelMeta => propertyValueInner(labelMeta))
}
}
def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs= {
// propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse()
if (propsWithTs.containsKey(labelMeta.name)) {
propsWithTs.get(labelMeta.name).innerValWithTs
} else {
innerLabel.metaPropsDefaultMapInner(labelMeta)
}
}
def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, InnerValLikeWithTs] = {
val labelMetas = for {
key <- keys
labelMeta <- innerLabel.metaPropsInvMap.get(key)
} yield labelMeta
propertyValuesInner(labelMetas)
}
def propertyValuesInner(labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, InnerValLikeWithTs] = {
if (labelMetas.isEmpty) {
innerLabel.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) =>
labelMeta -> propertyValueInner(labelMeta)
}
} else {
// This is important since timestamp is required for all edges.
(LabelMeta.timestamp +: labelMetas).map { labelMeta =>
labelMeta -> propertyValueInner(labelMeta)
}.toMap
}
}
// if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.")
// assert(propsWithTs.contains(LabelMeta.timeStampSeq))
lazy val properties = toProps()
def props = propsWithTs.asScala.mapValues(_.innerVal)
private def toProps(): Map[String, Any] = {
for {
(labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner
} yield {
// labelMeta.name -> propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(defaultVal).innerVal.value
val value =
if (propsWithTs.containsKey(labelMeta.name)) {
propsWithTs.get(labelMeta.name).value
} else {
defaultVal.innerVal.value
}
labelMeta.name -> value
}
}
def relatedEdges = {
if (labelWithDir.isDirected) {
val skipReverse = innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
if (skipReverse) List(this) else List(this, duplicateEdge)
} else {
// val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
// val base = copy(labelWithDir = outDir)
val base = copy(dir = GraphUtil.directions("out"))
List(base, base.reverseSrcTgtEdge)
}
}
// def relatedEdges = List(this)
private def getServiceColumn(vertex: S2Vertex, defaultServiceColumn: ServiceColumn) =
if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else vertex.id.column
def srcForVertex = {
val belongLabelIds = Seq(labelWithDir.labelId)
if (labelWithDir.dir == GraphUtil.directions("in")) {
val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn)
innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
} else {
val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn)
innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
}
}
def tgtForVertex = {
val belongLabelIds = Seq(labelWithDir.labelId)
if (labelWithDir.dir == GraphUtil.directions("in")) {
val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn)
innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
} else {
val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn)
innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
}
}
def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge
// def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled)
def reverseDirEdge = copy(dir = GraphUtil.toggleDir(dir))
def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex)
def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId)
override def serviceName = innerLabel.serviceName
override def queueKey = Seq(ts.toString, tgtVertex.serviceName).mkString("|")
override def queuePartitionKey = Seq(srcVertex.innerId, tgtVertex.innerId).mkString("|")
override def isAsync = innerLabel.isAsync
def isDegree = propsWithTs.containsKey(LabelMeta.degree.name)
// def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
// case Some(_) => props
// case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
// }
def propsPlusTsValid = propsWithTs.asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava
def edgesWithIndex = for (labelOrder <- labelOrders) yield {
IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt)
}
def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt)
}
/** force direction as out on invertedEdge */
def toSnapshotEdge: SnapshotEdge = {
val (smaller, larger) = (srcForVertex, tgtForVertex)
// val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out"))
propertyInner(LabelMeta.timestamp.name, ts, ts)
val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel,
GraphUtil.directions("out"), op, version, propsWithTs,
pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
ret
}
def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(),
"label" -> innerLabel.label, "service" -> innerLabel.serviceName)
def propsWithName =
for {
(_, v) <- propsWithTs.asScala
meta = v.labelMeta
jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
} yield meta.name -> jsValue
def updateTgtVertex(id: InnerValLike) = {
val newId = TargetVertexId(tgtVertex.id.column, id)
val newTgtVertex = innerGraph.newVertex(newId, tgtVertex.ts, tgtVertex.props)
S2Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
}
def rank(r: RankParam): Double =
if (r.keySeqAndWeights.size <= 0) 1.0f
else {
var sum: Double = 0
for ((labelMeta, w) <- r.keySeqAndWeights) {
if (propsWithTs.containsKey(labelMeta.name)) {
val innerValWithTs = propsWithTs.get(labelMeta.name)
val cost = try innerValWithTs.innerVal.toString.toDouble catch {
case e: Exception =>
logger.error("toInnerval failed in rank", e)
1.0
}
sum += w * cost
}
}
sum
}
def toLogString: String = {
// val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t")
}
override def hashCode(): Int = {
id().hashCode()
}
override def equals(other: Any): Boolean = other match {
case e: Edge => id().equals(e.id())
case _ => false
}
// override def toString(): String = {
// Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> labelName, "direction" -> direction,
// "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
// "parentEdges" -> parentEdges, "originalEdge" -> originalEdgeOpt, "statusCode" -> statusCode, "lockTs" -> lockTs
// ).toString
// }
override def toString: String = {
// E + L_BRACKET + edge.id() + R_BRACKET + L_BRACKET + edge.outVertex().id() + DASH + edge.label() + ARROW + edge.inVertex().id() + R_BRACKET;
s"e[${id}][${srcForVertex.id}-${innerLabel.label}->${tgtForVertex.id}]"
// s"e[${srcForVertex.id}-${innerLabel.label}->${tgtForVertex.id}]"
}
def checkProperty(key: String): Boolean = propsWithTs.containsKey(key)
def copyEdge(srcVertex: S2Vertex = srcVertex,
tgtVertex: S2Vertex = tgtVertex,
innerLabel: Label = innerLabel,
dir: Int = dir,
op: Byte = op,
version: Long = version,
propsWithTs: State = S2Edge.propsToState(this.propsWithTs),
parentEdges: Seq[EdgeWithScore] = parentEdges,
originalEdgeOpt: Option[S2Edge] = originalEdgeOpt,
pendingEdgeOpt: Option[S2Edge] = pendingEdgeOpt,
statusCode: Byte = statusCode,
lockTs: Option[Long] = lockTs,
tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt,
ts: Long = ts): S2Edge = {
val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps,
parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
S2Edge.fillPropsWithTs(edge, propsWithTs)
edge.propertyInner(LabelMeta.timestamp.name, ts, ts)
edge
}
def copyEdgeWithState(state: State, ts: Long): S2Edge = {
val newEdge = copy(propsWithTs = S2Edge.EmptyProps)
S2Edge.fillPropsWithTs(newEdge, state)
newEdge.propertyInner(LabelMeta.timestamp.name, ts, ts)
newEdge
}
def copyEdgeWithState(state: State): S2Edge = {
val newEdge = copy(propsWithTs = S2Edge.EmptyProps)
S2Edge.fillPropsWithTs(newEdge, state)
newEdge
}
override def vertices(direction: Direction): util.Iterator[structure.Vertex] = {
val arr = new util.ArrayList[Vertex]()
direction match {
case Direction.OUT =>
// val newVertexId = this.direction match {
// case "out" => VertexId(innerLabel.srcColumn, srcVertex.innerId)
// case "in" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId)
// case _ => throw new IllegalArgumentException("direction can only be out/in.")
// }
val newVertexId = edgeId.srcVertexId
innerGraph.getVertex(newVertexId).foreach(arr.add)
case Direction.IN =>
// val newVertexId = this.direction match {
// case "in" => VertexId(innerLabel.srcColumn, srcVertex.innerId)
// case "out" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId)
// case _ => throw new IllegalArgumentException("direction can only be out/in.")
// }
val newVertexId = edgeId.tgtVertexId
innerGraph.getVertex(newVertexId).foreach(arr.add)
case _ =>
import scala.collection.JavaConversions._
vertices(Direction.OUT).foreach(arr.add)
vertices(Direction.IN).foreach(arr.add)
}
arr.iterator()
}
override def properties[V](keys: String*): util.Iterator[Property[V]] = {
val ls = new util.ArrayList[Property[V]]()
if (keys.isEmpty) {
propsWithTs.forEach(new BiConsumer[String, S2Property[_]] {
override def accept(key: String, property: S2Property[_]): Unit = {
if (!LabelMeta.reservedMetaNamesSet(key) && property.isPresent && key != T.id.name)
ls.add(property.asInstanceOf[S2Property[V]])
}
})
} else {
keys.foreach { key =>
val prop = property[V](key)
if (prop.isPresent) ls.add(prop)
}
}
ls.iterator()
}
override def property[V](key: String): Property[V] = {
val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new java.lang.IllegalStateException(s"$key is not configured on Edge."))
if (propsWithTs.containsKey(key)) propsWithTs.get(key).asInstanceOf[Property[V]]
else {
Property.empty()
// val default = innerLabel.metaPropsDefaultMapInner(labelMeta)
// propertyInner(key, default.innerVal.value, default.ts).asInstanceOf[Property[V]]
}
}
// just for tinkerpop: save to storage, do not use for internal
override def property[V](key: String, value: V): Property[V] = {
S2Property.assertValidProp(key, value)
val v = propertyInner(key, value, System.currentTimeMillis())
val newTs = props.get(LabelMeta.timestamp.name).map(_.toString.toLong + 1).getOrElse(System.currentTimeMillis())
val newEdge = this.copyEdge(ts = newTs)
Await.result(innerGraph.mutateEdges(Seq(newEdge), withWait = true), innerGraph.WaitTimeout)
v
}
def propertyInner[V](key: String, value: V, ts: Long): Property[V] = {
val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge."))
val newProp = new S2Property[V](this, labelMeta, key, value, ts)
propsWithTs.put(key, newProp)
newProp
}
override def remove(): Unit = {
if (graph.features().edge().supportsRemoveEdges()) {
val requestTs = System.currentTimeMillis()
val edgeToDelete = this.copyEdge(op = GraphUtil.operations("delete"),
version = version + S2Edge.incrementVersion, propsWithTs = S2Edge.propsToState(updatePropsWithTs()), ts = requestTs)
// should we delete related edges also?
val future = innerGraph.mutateEdges(Seq(edgeToDelete), withWait = true)
val mutateSuccess = Await.result(future, innerGraph.WaitTimeout)
if (!mutateSuccess.forall(identity)) throw new RuntimeException("edge remove failed.")
} else {
throw Edge.Exceptions.edgeRemovalNotSupported()
}
}
override def graph(): Graph = innerGraph
lazy val edgeId: EdgeId = {
// NOTE: xxxForVertex makes direction to be "out"
val timestamp = if (this.innerLabel.consistencyLevel == "strong") 0l else ts
// EdgeId(srcVertex.innerId, tgtVertex.innerId, label(), "out", timestamp)
val (srcColumn, tgtColumn) = innerLabel.srcTgtColumn(dir)
if (direction == "out")
EdgeId(VertexId(srcColumn, srcVertex.id.innerId), VertexId(tgtColumn, tgtVertex.id.innerId), label(), "out", timestamp)
else
EdgeId(VertexId(tgtColumn, tgtVertex.id.innerId), VertexId(srcColumn, srcVertex.id.innerId), label(), "out", timestamp)
}
override def id(): AnyRef = edgeId
override def label(): String = innerLabel.label
}
object EdgeId {
val EdgeIdDelimiter = ","
def fromString(s: String): EdgeId = {
// val Array(src, tgt, labelName, dir, ts) = s.split(EdgeIdDelimiter)
// val label = Label.findByName(labelName).getOrElse(throw LabelNotExistException(labelName))
// val srcColumn = label.srcColumnWithDir(GraphUtil.toDirection(dir))
// val tgtColumn = label.tgtColumnWithDir(GraphUtil.toDirection(dir))
// EdgeId(
// JSONParser.toInnerVal(src, srcColumn.columnType, label.schemaVersion),
// JSONParser.toInnerVal(tgt, tgtColumn.columnType, label.schemaVersion),
// labelName,
// dir,
// ts.toLong
// )
val js = Json.parse(s)
s2EdgeIdReads.reads(Json.parse(s)).get
}
}
case class EdgeId(srcVertexId: VertexId,
tgtVertexId: VertexId,
labelName: String,
direction: String,
ts: Long) {
override def toString: String = {
import io.Conversions._
// Seq(srcVertexId.toIdString(), tgtVertexId.toIdString(), labelName, direction, ts.toString).mkString(EdgeId.EdgeIdDelimiter)
s2EdgeIdWrites.writes(this).toString()
}
}
object EdgeMutate {
def partitionBufferedIncrement(edges: Seq[IndexEdge]): (Seq[IndexEdge], Seq[IndexEdge]) = {
edges.partition(_.indexOption.fold(false)(_.isBufferIncrement))
}
def filterIndexOptionForDegree(edges: Seq[IndexEdge]): Seq[IndexEdge] = edges.filter { ie =>
ie.indexOption.fold(true)(_.storeDegree)
}
def filterIndexOption(edges: Seq[IndexEdge]): Seq[IndexEdge] = edges.filter { ie =>
ie.indexOption.fold(true) { option =>
val hashValueOpt = ie.orders.find { case (k, v) => k == LabelMeta.fromHash }.map { case (k, v) =>
v.value.toString.toLong
}
option.sample(ie, hashValueOpt)
}
}
}
case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],
newSnapshotEdge: Option[SnapshotEdge] = None) {
def deepCopy: EdgeMutate = copy(
edgesToDelete = edgesToDelete.map(IndexEdge.copyFrom),
edgesToInsert = edgesToInsert.map(IndexEdge.copyFrom),
newSnapshotEdge = newSnapshotEdge.map(SnapshotEdge.copyFrom)
)
val edgesToInsertWithIndexOpt: Seq[IndexEdge] = EdgeMutate.filterIndexOption(edgesToInsert)
val edgesToDeleteWithIndexOpt: Seq[IndexEdge] = EdgeMutate.filterIndexOption(edgesToDelete)
val edgesToInsertWithIndexOptForDegree: Seq[IndexEdge] = EdgeMutate.filterIndexOptionForDegree(edgesToInsert)
val edgesToDeleteWithIndexOptForDegree: Seq[IndexEdge] = EdgeMutate.filterIndexOptionForDegree(edgesToDelete)
def toLogString: String = {
val l = (0 until 50).map(_ => "-").mkString("")
val deletes = s"deletes: ${edgesToDelete.map(e => e.toLogString).mkString("\n")}"
val inserts = s"inserts: ${edgesToInsert.map(e => e.toLogString).mkString("\n")}"
val updates = s"snapshot: ${newSnapshotEdge.map(e => e.toLogString).mkString("\n")}"
List("\n", l, deletes, inserts, updates, l, "\n").mkString("\n")
}
}
object S2Edge {
val incrementVersion = 1L
val minTsVal = 0L
/** now version information is required also **/
type Props = java.util.Map[String, S2Property[_]]
type State = Map[LabelMeta, InnerValLikeWithTs]
type PropsPairWithTs = (State, State, Long, String)
type MergeState = PropsPairWithTs => (State, Boolean)
type UpdateFunc = (Option[S2Edge], S2Edge, MergeState)
def EmptyProps = new java.util.HashMap[String, S2Property[_]]
def EmptyState = Map.empty[LabelMeta, InnerValLikeWithTs]
def sameProps(base: Props, other: Props): Boolean = {
if (base.size != other.size) false
else {
var ret = true
val iter = base.entrySet().iterator()
while (iter.hasNext) {
val e = iter.next()
if (!other.containsKey(e.getKey)) ret = false
else if (e.getValue != other.get(e.getKey)) ret = false
else {
}
}
val otherIter = other.entrySet().iterator()
while (otherIter.hasNext) {
val e = otherIter.next()
if (!base.containsKey(e.getKey)) ret = false
else if (e.getValue != base.get(e.getKey)) ret = false
else {
}
}
ret
}
}
def fillPropsWithTs(snapshotEdge: SnapshotEdge, state: State): Unit = {
state.foreach { case (k, v) => snapshotEdge.property(k.name, v.innerVal.value, v.ts) }
}
def fillPropsWithTs(indexEdge: IndexEdge, state: State): Unit = {
state.foreach { case (k, v) => indexEdge.property(k.name, v.innerVal.value, v.ts) }
}
def fillPropsWithTs(edge: S2Edge, state: State): Unit = {
state.foreach { case (k, v) => edge.propertyInner(k.name, v.innerVal.value, v.ts) }
}
def propsToState(props: Props): State = {
props.asScala.map { case (k, v) =>
v.labelMeta -> v.innerValWithTs
}.toMap
}
def stateToProps(edge: S2Edge, state: State): Props = {
state.foreach { case (k, v) =>
edge.propertyInner(k.name, v.innerVal.value, v.ts)
}
edge.propsWithTs
}
def allPropsDeleted(props: Map[LabelMeta, InnerValLikeWithTs]): Boolean =
if (!props.contains(LabelMeta.lastDeletedAt)) false
else {
val lastDeletedAt = props.get(LabelMeta.lastDeletedAt).get.ts
val propsWithoutLastDeletedAt = props - LabelMeta.lastDeletedAt
propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt }
}
def allPropsDeleted(props: Props): Boolean =
if (!props.containsKey(LabelMeta.lastDeletedAt.name)) false
else {
val lastDeletedAt = props.get(LabelMeta.lastDeletedAt.name).ts
props.remove(LabelMeta.lastDeletedAt.name)
// val propsWithoutLastDeletedAt = props
//
// propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt }
var ret = true
val iter = props.entrySet().iterator()
while (iter.hasNext && ret) {
val e = iter.next()
if (e.getValue.ts > lastDeletedAt) ret = false
}
ret
}
def buildDeleteBulk(invertedEdge: Option[S2Edge], requestEdge: S2Edge): (S2Edge, EdgeMutate) = {
// assert(invertedEdge.isEmpty)
// assert(requestEdge.op == GraphUtil.operations("delete"))
val edgesToDelete = requestEdge.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
val edgeInverted = Option(requestEdge.toSnapshotEdge)
(requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, newSnapshotEdge = edgeInverted))
}
def buildOperation(invertedEdge: Option[S2Edge], requestEdges: Seq[S2Edge]): (S2Edge, EdgeMutate) = {
// logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}")
// logger.debug(s"requestEdge: ${requestEdge.toStringRaw}")
val oldPropsWithTs =
if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs]
else propsToState(invertedEdge.get.propsWithTs)
val funcs = requestEdges.map { edge =>
if (edge.op == GraphUtil.operations("insert")) {
edge.innerLabel.consistencyLevel match {
case "strong" => S2Edge.mergeUpsert _
case _ => S2Edge.mergeInsertBulk _
}
} else if (edge.op == GraphUtil.operations("insertBulk")) {
S2Edge.mergeInsertBulk _
} else if (edge.op == GraphUtil.operations("delete")) {
edge.innerLabel.consistencyLevel match {
case "strong" => S2Edge.mergeDelete _
case _ => throw new RuntimeException("not supported")
}
}
else if (edge.op == GraphUtil.operations("update")) S2Edge.mergeUpdate _
else if (edge.op == GraphUtil.operations("increment")) S2Edge.mergeIncrement _
else throw new RuntimeException(s"not supported operation on edge: $edge")
}
val oldTs = invertedEdge.map(_.ts).getOrElse(minTsVal)
val requestWithFuncs = requestEdges.zip(funcs).filter(oldTs != _._1.ts).sortBy(_._1.ts)
if (requestWithFuncs.isEmpty) {
(requestEdges.head, EdgeMutate())
} else {
val requestEdge = requestWithFuncs.last._1
var prevPropsWithTs = oldPropsWithTs
for {
(requestEdge, func) <- requestWithFuncs
} {
val (_newPropsWithTs, _) = func(prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer)
prevPropsWithTs = _newPropsWithTs
// logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n")
}
val requestTs = requestEdge.ts
/** version should be monotoniously increasing so our RPC mutation should be applied safely */
val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs)
val maxTs = prevPropsWithTs.map(_._2.ts).max
val newTs = if (maxTs > requestTs) maxTs else requestTs
val propsWithTs = prevPropsWithTs ++
Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.innerLabel.schemaVersion), newTs))
val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs)
// logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
// logger.error(s"$propsWithTs")
val newEdge = requestEdge.copy(propsWithTs = EmptyProps)
fillPropsWithTs(newEdge, propsWithTs)
(newEdge, edgeMutate)
}
}
def buildMutation(snapshotEdgeOpt: Option[S2Edge],
requestEdge: S2Edge,
newVersion: Long,
oldPropsWithTs: Map[LabelMeta, InnerValLikeWithTs],
newPropsWithTs: Map[LabelMeta, InnerValLikeWithTs]): EdgeMutate = {
if (oldPropsWithTs == newPropsWithTs) {
// all requests should be dropped. so empty mutation.
EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = None)
} else {
val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != LabelMeta.lastDeletedAtSeq)
val newOp = snapshotEdgeOpt match {
case None => requestEdge.op
case Some(old) =>
val oldMaxTs = old.propsWithTs.asScala.map(_._2.ts).max
if (oldMaxTs > requestEdge.ts) old.op
else requestEdge.op
}
val newSnapshotEdge = requestEdge.copy(op = newOp, version = newVersion).copyEdgeWithState(newPropsWithTs)
val newSnapshotEdgeOpt = Option(newSnapshotEdge.toSnapshotEdge)
// delete request must always update snapshot.
if (withOutDeletedAt == oldPropsWithTs && newPropsWithTs.contains(LabelMeta.lastDeletedAt)) {
// no mutation on indexEdges. only snapshotEdge should be updated to record lastDeletedAt.
EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = newSnapshotEdgeOpt)
} else {
val edgesToDelete = snapshotEdgeOpt match {
case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") =>
snapshotEdge.copy(op = GraphUtil.defaultOpByte)
.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
case _ => Nil
}
val edgesToInsert =
if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil
else {
val newEdge = requestEdge.copy(
version = newVersion,
propsWithTs = S2Edge.EmptyProps,
op = GraphUtil.defaultOpByte
)
newPropsWithTs.foreach { case (k, v) => newEdge.propertyInner(k.name, v.innerVal.value, v.ts) }
newEdge.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
}
EdgeMutate(edgesToDelete = edgesToDelete, edgesToInsert = edgesToInsert, newSnapshotEdge = newSnapshotEdgeOpt)
}
}
}
def mergeUpsert(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
var shouldReplace = false
val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
propsWithTs.get(k) match {
case Some(newValWithTs) =>
assert(oldValWithTs.ts >= lastDeletedAt)
val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
else {
shouldReplace = true
newValWithTs
}
Some(k -> v)
case None =>
assert(oldValWithTs.ts >= lastDeletedAt)
if (oldValWithTs.ts >= requestTs || k.seq < 0) Some(k -> oldValWithTs)
else {
shouldReplace = true
None
}
}
}
val existInNew =
for {
(k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
} yield {
shouldReplace = true
Some(k -> newValWithTs)
}
((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
}
def mergeUpdate(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
var shouldReplace = false
val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
propsWithTs.get(k) match {
case Some(newValWithTs) =>
assert(oldValWithTs.ts >= lastDeletedAt)
val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
else {
shouldReplace = true
newValWithTs
}
Some(k -> v)
case None =>
// important: update need to merge previous valid values.
assert(oldValWithTs.ts >= lastDeletedAt)
Some(k -> oldValWithTs)
}
}
val existInNew = for {
(k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
} yield {
shouldReplace = true
Some(k -> newValWithTs)
}
((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
}
def mergeIncrement(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
var shouldReplace = false
val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
propsWithTs.get(k) match {
case Some(newValWithTs) =>
if (k == LabelMeta.timestamp) {
val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
else {
shouldReplace = true
newValWithTs
}
Some(k -> v)
} else {
if (oldValWithTs.ts >= newValWithTs.ts) {
Some(k -> oldValWithTs)
} else {
assert(oldValWithTs.ts < newValWithTs.ts && oldValWithTs.ts >= lastDeletedAt)
shouldReplace = true
// incr(t0), incr(t2), d(t1) => deleted
Some(k -> InnerValLikeWithTs(oldValWithTs.innerVal + newValWithTs.innerVal, oldValWithTs.ts))
}
}
case None =>
assert(oldValWithTs.ts >= lastDeletedAt)
Some(k -> oldValWithTs)
// if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs) else None
}
}
val existInNew = for {
(k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
} yield {
shouldReplace = true
Some(k -> newValWithTs)
}
((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
}
def mergeDelete(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
var shouldReplace = false
val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt) match {
case Some(prevDeletedAt) =>
if (prevDeletedAt.ts >= requestTs) prevDeletedAt.ts
else {
shouldReplace = true
requestTs
}
case None => {
shouldReplace = true
requestTs
}
}
val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
if (k == LabelMeta.timestamp) {
if (oldValWithTs.ts >= requestTs) Some(k -> oldValWithTs)
else {
shouldReplace = true
Some(k -> InnerValLikeWithTs.withLong(requestTs, requestTs, version))
}
} else {
if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs)
else {
shouldReplace = true
None
}
}
}
val mustExistInNew = Map(LabelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(lastDeletedAt, lastDeletedAt, version))
((existInOld.flatten ++ mustExistInNew).toMap, shouldReplace)
}
def mergeInsertBulk(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
val (_, propsWithTs, _, _) = propsPairWithTs
(propsWithTs, true)
}
// def fromString(s: String): Option[Edge] = Graph.toEdge(s)
}