blob: dd73fd0345be496e9bba44dd9fc49c910dd36b30 [file] [log] [blame]
package com.daumkakao.s2graph.core
import HBaseElement._
import org.apache.hadoop.hbase.Cell
import org.apache.hadoop.hbase.client.{ Delete, Mutation, Put, Result }
import org.hbase.async.{HBaseRpc, DeleteRequest, PutRequest}
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
import play.api.libs.json.Json
case class EdgeWithIndexInverted(srcVertex: Vertex, tgtVertex: Vertex, labelWithDir: LabelWithDirection, op: Byte, version: Long, props: Map[Byte, InnerValWithTs]) {
import GraphConstant._
import Edge._
// Logger.error(s"EdgeWithIndexInverted${this.toString}")
lazy val lastModifiedAt = props.map(_._2.ts).max
lazy val rowKey = EdgeRowKey(srcVertex.id, labelWithDir, LabelIndex.defaultSeq, isInverted = true)
lazy val qualifier = EdgeQualifierInverted(tgtVertex.id)
// only for toString.
lazy val label = Label.findById(labelWithDir.labelId)
lazy val propsWithoutTs = props.map(kv => (kv._1 -> kv._2.innerVal))
lazy val value = EdgeValueInverted(op, props.toList)
def buildPut() = {
val put = new Put(rowKey.bytes)
put.add(edgeCf, qualifier.bytes, version, value.bytes)
}
def buildPutAsync() = {
new PutRequest(label.hbaseTableName.getBytes, rowKey.bytes, edgeCf, qualifier.bytes, value.bytes, version)
}
def isSame(other: Any): Boolean = {
val ret = this.toString == other.toString
logger.debug(s"EdgeWithIndexInverted\n$this\n$other\n$ret")
ret
}
override def toString(): String = {
val ls = ListBuffer(lastModifiedAt, GraphUtil.fromOp(op), "e",
srcVertex.innerId, tgtVertex.innerId, label.label)
// if (!propsWithName.isEmpty) ls += Json.toJson(propsWithName)
if (!props.isEmpty) ls += props
ls.mkString("\t")
}
}
case class EdgeWithIndex(srcVertex: Vertex, tgtVertex: Vertex, labelWithDir: LabelWithDirection, op: Byte, version: Long, labelIndexSeq: Byte, props: Map[Byte, InnerVal]) {
assert(props.get(LabelMeta.timeStampSeq).isDefined)
lazy val ts = props(LabelMeta.timeStampSeq).longV.get
import GraphConstant._
import Edge._
lazy val rowKey = EdgeRowKey(srcVertex.id, labelWithDir, labelIndexSeq, isInverted = false)
lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get
lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map(meta => meta.seq -> meta.defaultInnerVal).toMap
lazy val labelIndexMetaSeqs = labelIndex.metaSeqs
lazy val orders = for (k <- labelIndexMetaSeqs) yield {
props.get(k) match {
case None =>
/**
* TODO: agly hack
* now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once
*/
val v = k match {
case LabelMeta.timeStampSeq => InnerVal.withLong(ts)
case LabelMeta.toSeq => tgtVertex.innerId
case LabelMeta.fromSeq => //srcVertex.innerId
// for now, it does not make sense to build index on srcVertex.innerId since all edges have same data.
throw new RuntimeException("_from on indexProps is not supported")
case _ => defaultIndexMetas(k)
}
// val v = if (k == LabelMeta.timeStampSeq) InnerVal.withLong(ts) else defaultIndexMetas(k)
(k -> v)
case Some(v) => (k -> v)
}
}
lazy val metas = for ((k, v) <- props if !defaultIndexMetas.containsKey(k)) yield (k -> v)
lazy val qualifier = EdgeQualifier(orders, tgtVertex.id, op)
lazy val value = EdgeValue(metas.toList)
lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length
// only for toString.
lazy val label = Label.findById(labelWithDir.labelId)
def buildPuts(): List[Put] = {
if (!hasAllPropsForIndex) {
logger.error(s"$this dont have all props for index")
List.empty[Put]
} else {
val put = new Put(rowKey.bytes)
// Logger.debug(s"$this")
// Logger.debug(s"EdgeWithIndex.buildPut: $rowKey, $qualifier, $value")
put.add(edgeCf, qualifier.bytes, version, value.bytes)
List(put)
}
}
def buildPutsAsync(): List[PutRequest] = {
if (!hasAllPropsForIndex) {
logger.error(s"$this dont have all props for index")
List.empty[PutRequest]
} else {
List(new PutRequest(label.hbaseTableName.getBytes, rowKey.bytes, edgeCf, qualifier.bytes, value.bytes, version))
}
}
def buildDeletes(): List[Delete] = {
if (!hasAllPropsForIndex) List.empty[Delete]
else {
val delete = new Delete(rowKey.bytes)
delete.deleteColumns(edgeCf, qualifier.bytes, version)
List(delete)
}
}
def buildDeletesAsync(): List[DeleteRequest] = {
if (!hasAllPropsForIndex) List.empty[DeleteRequest]
else {
List(new DeleteRequest(label.hbaseTableName.getBytes, rowKey.bytes, edgeCf, qualifier.bytes, version))
}
}
override def toString(): String = {
val ls = ListBuffer(ts, GraphUtil.fromOp(op), "e",
srcVertex.innerId, tgtVertex.innerId, label.label)
// if (!propsWithName.isEmpty) ls += Json.toJson(propsWithName ++ Map("version" -> version.toString))
if (!props.isEmpty) ls += props
ls.mkString("\t")
}
def isSame(other: Any): Boolean = {
val ret = this.toString == other.toString
logger.debug(s"EdgeWithIndex\n$this\n$other\n$ret")
ret
}
}
/**
*
*/
case class Edge(srcVertex: Vertex, tgtVertex: Vertex, labelWithDir: LabelWithDirection, op: Byte,
ts: Long, version: Long, propsWithTs: Map[Byte, InnerValWithTs]) extends GraphElement with JSONParser {
val logger = Edge.logger
implicit val ex = Graph.executionContext
lazy val props =
if (op == GraphUtil.operations("delete")) Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts))
else for ((k, v) <- propsWithTs) yield (k -> v.innerVal)
lazy val relatedEdges = {
labelWithDir.dir match {
case 2 => //undirected
val out = LabelWithDirection(labelWithDir.labelId, 0)
val base = Edge(srcVertex, tgtVertex, out, op, ts, version, propsWithTs)
List(base, base.duplicateEdge, base.reverseDirEdge, base.reverseSrcTgtEdge)
case 0 | 1 =>
List(this, duplicateEdge)
}
}
lazy val srcForVertex = Vertex(srcVertex.id.updateIsEdge(false), srcVertex.ts, srcVertex.props)
lazy val tgtForVertex = Vertex(tgtVertex.id.updateIsEdge(false), tgtVertex.ts, tgtVertex.props)
lazy val duplicateEdge = Edge(tgtVertex, srcVertex, labelWithDir.dirToggled, op, ts, version, propsWithTs)
lazy val reverseDirEdge = Edge(srcVertex, tgtVertex, labelWithDir.dirToggled, op, ts, version, propsWithTs)
lazy val reverseSrcTgtEdge = Edge(tgtVertex, srcVertex, labelWithDir, op, ts, version, propsWithTs)
lazy val label = Label.findById(labelWithDir.labelId)
lazy val labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId)
override lazy val serviceName = label.serviceName
override lazy val queueKey = Seq(ts.toString, tgtVertex.serviceName).mkString("|")
override lazy val queuePartitionKey = Seq(srcVertex.innerId, tgtVertex.innerId).mkString("|")
lazy val propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
case Some(_) => props
case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts))
}
lazy val propsPlusTsValid = propsPlusTs.filter(kv => kv._1 >= 0)
lazy val edgesWithIndex = {
for (labelOrder <- labelOrders) yield {
EdgeWithIndex(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsPlusTs)
}
}
lazy val edgesWithIndexValid = {
for (labelOrder <- labelOrders) yield {
EdgeWithIndex(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsPlusTsValid)
}
}
def edgesWithIndex(newOp: Byte) = {
for (labelOrder <- labelOrders) yield {
EdgeWithIndex(srcVertex, tgtVertex, labelWithDir, newOp, version, labelOrder.seq, propsPlusTs)
}
}
def edgesWithIndexValid(newOp: Byte) = {
for (labelOrder <- labelOrders) yield {
EdgeWithIndex(srcVertex, tgtVertex, labelWithDir, newOp, version, labelOrder.seq, propsPlusTsValid)
}
}
lazy val edgesWithInvertedIndex = {
EdgeWithIndexInverted(srcVertex, tgtVertex, labelWithDir, op, version, propsWithTs ++
Map(LabelMeta.timeStampSeq -> InnerValWithTs(InnerVal.withLong(ts), ts)))
}
def edgesWithInvertedIndex(newOp: Byte) = {
EdgeWithIndexInverted(srcVertex, tgtVertex, labelWithDir, newOp, version, propsWithTs ++
Map(LabelMeta.timeStampSeq -> InnerValWithTs(InnerVal.withLong(ts), ts)))
}
lazy val propsWithName = for {
(seq, v) <- props
name <- label.metaPropNamesMap.get(seq) if seq > 0
} yield (name -> innerValToJsValue(v))
lazy val canBeBatched =
op == GraphUtil.operations("insertBulk") ||
(op == GraphUtil.operations("insert") && label.consistencyLevel != "strong")
def updateTgtVertex(id: InnerVal) = {
val newCompositeId = CompositeId(tgtVertex.id.colId, id, isEdge = tgtVertex.id.isEdge, useHash = tgtVertex.id.useHash)
val newTgtVertex = Vertex(newCompositeId, tgtVertex.ts, tgtVertex.props)
Edge(srcVertex, newTgtVertex, labelWithDir, op, ts, version, propsWithTs)
}
/**
* methods for build mutations.
*/
def buildVertexPuts(): List[Put] = srcForVertex.buildPuts ++ tgtForVertex.buildPuts
//
def buildVertexPutsAsync(): List[PutRequest] = srcForVertex.buildPutsAsync() ++ tgtForVertex.buildPutsAsync()
def buildPutsAll(): List[HBaseRpc] = {
val edgePuts =
relatedEdges.flatMap { edge =>
if (edge.op == GraphUtil.operations("insert")) {
if (label.consistencyLevel == "strong") {
edge.upsert()
List.empty[PutRequest]
} else {
edge.insert()
}
} else if (edge.op == GraphUtil.operations("delete")) {
edge.delete(label.consistencyLevel == "strong")
List.empty[PutRequest]
} else if (edge.op == GraphUtil.operations("update")) {
edge.update()
List.empty[PutRequest]
} else if (edge.op == GraphUtil.operations("increment")) {
edge.increment()
List.empty[PutRequest]
} else if (edge.op == GraphUtil.operations("insertBulk")) {
edge.insert()
} else {
throw new Exception(s"operation[${edge.op}] is not supported on edge.")
}
}
edgePuts
}
def insertBulk() = {
edgesWithInvertedIndex.buildPut :: edgesWithIndex.flatMap(e => e.buildPuts())
}
def insert() = {
edgesWithInvertedIndex.buildPutAsync :: edgesWithIndex.flatMap(e => e.buildPutsAsync)
}
/**
* write must be synchronous to keep consistencyLevel
*/
// def fetchInverted() = {
// Graph.getEdgeSync(srcVertex, tgtVertex, label, labelWithDir.dir)
// }
def fetchInvertedAsync() = {
Graph.getEdge(srcVertex, tgtVertex, label, labelWithDir.dir)
}
/**
*
*/
// def mutate(f: (Option[Edge], Edge) => EdgeUpdate): Unit = {
// val edgeUpdate = f(fetchInverted().headOption, this)
// withWriteTable(label.hbaseZkAddr, label.hbaseTableName) { table =>
// val size = edgeUpdate.mutations.size
// val rets: Array[Object] = Array.fill(size)(null)
// table.batch(edgeUpdate.mutations, rets)
// for (ret <- rets if ret == null) {
// Logger.error(s"mutation tryed but failed.")
// }
// } {
// Logger.error(s"mutation failed.")
// }
// }
def mutate(f: (Option[Edge], Edge) => EdgeUpdate): Unit = {
try {
val client = Graph.getClient(label.hbaseZkAddr)
for {
edges <- fetchInvertedAsync()
invertedEdgeOpt = edges.headOption
edgeUpdate = f(invertedEdgeOpt, this)
} yield {
/**
* we can use CAS operation on inverted Edge checkAndSet() and if it return false
* then re-read and build update and write recursively.
*/
Graph.writeAsync(label.hbaseZkAddr, edgeUpdate.mutations)
}
// client.flush()
} catch {
case e: Throwable =>
logger.error(s"mutate failed. $e", e)
}
}
def upsert(): Unit = {
mutate(Edge.buildUpsert)
}
def delete(mutateInPlace: Boolean = true) = {
mutate(Edge.buildDelete)
}
def update() = {
mutate(Edge.buildUpdate)
}
def increment() = {
mutate(Edge.buildIncrement)
}
def toJson() = {}
def rank(r: RankParam): Double = {
if (r.keySeqAndWeights.size <= 0) 1.0f
else {
// Logger.warn(s"orderMapping : ${orderByKeyValues} keyIdAndWeights : ${r.keyIdAndWeights}")
var sum: Double = 0
for {
(seq, w) <- r.keySeqAndWeights
} {
seq match {
case -1 => { //case key == "count"
sum += 1
}
case _ => {
props.get(seq) match {
case None =>
// Logger.error(s"Not Found SortKeyType : ${seq} for rank in Label(${labelWithDir.labelId}})'s OrderByKeys(${orderByKey.typeIds}})")
case Some(innerVal) => {
sum += w * innerVal.longV.getOrElse(1L).toDouble
}
}
}
}
}
sum
}
}
override def toString(): String = {
val ls = ListBuffer(ts, GraphUtil.fromOp(op), "e",
srcVertex.innerId, tgtVertex.innerId, label.label)
if (!propsWithName.isEmpty) ls += Json.toJson(propsWithName)
// if (!propsPlusTs.isEmpty) ls += propsPlusTs
ls.mkString("\t")
}
def toStringRaw(): String = {
val ls = ListBuffer(ts, GraphUtil.fromOp(op), "e",
srcVertex.innerId, tgtVertex.innerId, label.label)
if (!propsPlusTs.isEmpty) ls += propsPlusTs
ls.mkString("\t")
}
}
case class EdgeUpdate(mutations: List[HBaseRpc] = List.empty[HBaseRpc],
edgesToDelete: List[EdgeWithIndex] = List.empty[EdgeWithIndex],
edgesToInsert: List[EdgeWithIndex] = List.empty[EdgeWithIndex],
newInvertedEdge: Option[EdgeWithIndexInverted] = None) {
override def toString(): String = {
val size = s"mutations: ${mutations.size}"
val deletes = s"deletes: ${edgesToDelete.map(e => e.toString).mkString("\n")}"
val inserts = s"inserts: ${edgesToInsert.map(e => e.toString).mkString("\n")}"
val updates = s"snapshot: $newInvertedEdge"
List(size, deletes, inserts, updates).mkString("\n")
}
}
object Edge {
val logger = LoggerFactory.getLogger(classOf[Edge])
// val initialVersion = 2L
val incrementVersion = 1L
val minOperationTs = 1L
val minTsVal = 0L
type PropsPairWithTs = (Map[Byte, InnerValWithTs], Map[Byte, InnerValWithTs], Long)
def buildUpsert(invertedEdge: Option[Edge], requestEdge: Edge): EdgeUpdate = {
assert(requestEdge.op == GraphUtil.operations("insert"))
buildOperation(invertedEdge, requestEdge)(buildUpsert)
}
def buildUpdate(invertedEdge: Option[Edge], requestEdge: Edge): EdgeUpdate = {
assert(requestEdge.op == GraphUtil.operations("update"))
buildOperation(invertedEdge, requestEdge)(buildUpdate)
}
def buildDelete(invertedEdge: Option[Edge], requestEdge: Edge): EdgeUpdate = {
assert(requestEdge.op == GraphUtil.operations("delete"))
buildOperation(invertedEdge, requestEdge)(buildDelete)
}
def buildIncrement(invertedEdge: Option[Edge], requestEdge: Edge): EdgeUpdate = {
assert(requestEdge.op == GraphUtil.operations("increment"))
buildOperation(invertedEdge, requestEdge)(buildIncrement)
}
def buildOperation(invertedEdge: Option[Edge], requestEdge: Edge)(f: PropsPairWithTs => (Map[Byte, InnerValWithTs], Boolean)) = {
// Logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}")
// Logger.debug(s"requestEdge: ${requestEdge.toStringRaw}")
val oldPropsWithTs = if (invertedEdge.isEmpty) Map.empty[Byte, InnerValWithTs] else invertedEdge.get.propsWithTs
val minTsInOldProps = if (invertedEdge.isEmpty) minTsVal else invertedEdge.get.propsWithTs.map(kv => kv._2.ts).min
val oldTs = invertedEdge.map(e => e.ts).getOrElse(minTsVal)
if (oldTs == requestEdge.ts) {
logger.error(s"duplicate timestamp on same edge. $requestEdge")
EdgeUpdate()
} else {
val (newPropsWithTs, shouldReplace) = f(oldPropsWithTs, requestEdge.propsWithTs, requestEdge.ts)
if (!shouldReplace) {
logger.error(s"drop request $requestEdge becaseu shouldReplace is $shouldReplace")
EdgeUpdate()
} else {
val maxTsInNewProps = newPropsWithTs.map(kv => kv._2.ts).max
val newOp = if (maxTsInNewProps > requestEdge.ts) {
invertedEdge match {
case None => requestEdge.op
case Some(old) => old.op
}
} else {
requestEdge.op
}
val newEdgeVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestEdge.ts)
val maxTs = if (oldTs > requestEdge.ts) oldTs else requestEdge.ts
val newEdge = Edge(requestEdge.srcVertex, requestEdge.tgtVertex, requestEdge.labelWithDir,
newOp, maxTs, newEdgeVersion, newPropsWithTs)
buildReplace(invertedEdge, newEdge, newPropsWithTs)
}
}
}
def buildUpsert(propsPairWithTs: PropsPairWithTs) = {
var shouldReplace = false
val (oldPropsWithTs, propsWithTs, requestTs) = propsPairWithTs
val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
propsWithTs.get(k) match {
case Some(newValWithTs) =>
assert(oldValWithTs.ts >= lastDeletedAt)
val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs else {
shouldReplace = true
newValWithTs
}
Some(k -> v)
// if (v.ts > lastDeletedAt) Some(k -> v)
// else {
// shouldReplace = true
// None
// }
case None =>
assert(oldValWithTs.ts >= lastDeletedAt)
if (oldValWithTs.ts >= requestTs || k < 0) Some(k -> oldValWithTs)
else {
shouldReplace = true
None
}
}
}
val existInNew =
for {
(k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
} yield {
shouldReplace = true
Some(k -> newValWithTs)
}
((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
}
def buildUpdate(propsPairWithTs: PropsPairWithTs) = {
var shouldReplace = false
val (oldPropsWithTs, propsWithTs, requestTs) = propsPairWithTs
val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
propsWithTs.get(k) match {
case Some(newValWithTs) =>
assert(oldValWithTs.ts >= lastDeletedAt)
val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs else {
shouldReplace = true
newValWithTs
}
Some(k -> v)
// if (v.ts > lastDeletedAt) Some(k -> v)
// else None
case None =>
// important: update need to merge previous valid values.
assert(oldValWithTs.ts >= lastDeletedAt)
Some(k -> oldValWithTs)
// if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs) else None
}
}
val existInNew = for {
(k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
} yield {
shouldReplace = true
Some(k -> newValWithTs)
}
((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
}
def buildIncrement(propsPairWithTs: PropsPairWithTs) = {
var shouldReplace = false
val (oldPropsWithTs, propsWithTs, requestTs) = propsPairWithTs
val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
propsWithTs.get(k) match {
case Some(newValWithTs) =>
if (k == LabelMeta.timeStampSeq) {
val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs else {
shouldReplace = true
newValWithTs
}
Some(k -> v)
} else {
// assert(oldValWithTs.ts >= lastDeletedAt)
// if (oldValWithTs.ts < newValWithTs.ts) {
// shouldReplace = true
// Some(k -> InnerValWithTs(oldValWithTs.innerVal + newValWithTs.innerVal, oldValWithTs.ts))
// } else if (newValWithTs.ts >= lastDeletedAt){
// shouldReplace = true
// Some(k -> InnerValWithTs(oldValWithTs.innerVal + newValWithTs.innerVal, newValWithTs.ts))
// } else {
// Some(k -> oldValWithTs)
// }
if (oldValWithTs.ts >= newValWithTs.ts) {
Some(k -> oldValWithTs)
// if (newValWithTs.ts > lastDeletedAt) {
// shouldReplace = true
// Some(k -> InnerValWithTs(oldValWithTs.innerVal + newValWithTs.innerVal, newValWithTs.ts))
// } else {
// Some(k -> oldValWithTs)
// }
// if (oldValWithTs.ts > lastDeletedAt) Some(k -> oldValWithTs)
// else None
} else {
assert(oldValWithTs.ts < newValWithTs.ts && oldValWithTs.ts >= lastDeletedAt)
shouldReplace = true
// incr(t0), incr(t2), d(t1) => deleted
Some(k -> InnerValWithTs(oldValWithTs.innerVal + newValWithTs.innerVal, oldValWithTs.ts))
// if (oldValWithTs.ts > lastDeletedAt) {
//
// Some(k -> InnerValWithTs(oldValWithTs.innerVal + newValWithTs.innerVal, newValWithTs.ts))
// } else if (oldValWithTs.ts <= lastDeletedAt && lastDeletedAt < newValWithTs.ts) {
// Some(k -> newValWithTs)
// } else None
}
}
case None =>
assert(oldValWithTs.ts >= lastDeletedAt)
Some(k -> oldValWithTs)
// if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs) else None
}
}
val existInNew = for {
(k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
} yield {
shouldReplace = true
Some(k -> newValWithTs)
}
((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
}
def buildDelete(propsPairWithTs: PropsPairWithTs) = {
var shouldReplace = false
val (oldPropsWithTs, propsWithTs, requestTs) = propsPairWithTs
val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt) match {
case Some(prevDeletedAt) =>
if (prevDeletedAt.ts >= requestTs) prevDeletedAt.ts
else {
shouldReplace = true
requestTs
}
case None => {
shouldReplace = true
requestTs
}
}
val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
if (k == LabelMeta.timeStampSeq) {
if (oldValWithTs.ts >= requestTs) Some(k -> oldValWithTs)
else {
shouldReplace = true
Some(k -> InnerValWithTs.withLong(requestTs, requestTs))
}
} else {
if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs)
else {
shouldReplace = true
None
}
}
}
val mustExistInNew = Map(LabelMeta.lastDeletedAt -> InnerValWithTs.withLong(lastDeletedAt, lastDeletedAt))
((existInOld.flatten ++ mustExistInNew).toMap, shouldReplace)
}
private def allPropsDeleted(props: Map[Byte, InnerValWithTs]) = {
val maxTsProp = props.toList.sortBy(kv => (kv._2.ts, -1 * kv._1)).reverse.head
maxTsProp._1 == LabelMeta.lastDeletedAt
}
/**
* delete invertedEdge.edgesWithIndex
* insert requestEdge.edgesWithIndex
* update requestEdge.edgesWithIndexInverted
*/
def buildReplace(invertedEdge: Option[Edge], requestEdge: Edge, newPropsWithTs: Map[Byte, InnerValWithTs]): EdgeUpdate = {
val edgesToDelete = invertedEdge match {
// case Some(e) if e.op != GraphUtil.operations("delete") => e.edgesWithIndexValid
case Some(e) => e.edgesWithIndexValid
case _ =>
// nothing to remove on indexed.
List.empty[EdgeWithIndex]
}
val edgesToInsert = {
if (newPropsWithTs.isEmpty) List.empty[EdgeWithIndex]
else {
// val maxTsProp = newPropsWithTs.toList.sortBy(kv => (kv._2.ts, -1 * kv._1)).reverse.head
// if (maxTsProp._1 == LabelMeta.lastDeletedAt) {
if (allPropsDeleted(newPropsWithTs)) {
// all props is older than lastDeletedAt so nothing to insert on indexed.
List.empty[EdgeWithIndex]
} else {
requestEdge.edgesWithIndexValid
}
}
}
val edgeInverted = if (newPropsWithTs.isEmpty) None else Some(requestEdge.edgesWithInvertedIndex)
val deleteMutations = edgesToDelete.flatMap(edge => edge.buildDeletesAsync).toList
val insertMutations = edgesToInsert.flatMap(edge => edge.buildPutsAsync).toList
val invertMutations = edgeInverted.map(e => List(e.buildPutAsync)).getOrElse(List.empty[PutRequest])
val mutations = deleteMutations ++ insertMutations ++ invertMutations
val update = EdgeUpdate(mutations, edgesToDelete, edgesToInsert, edgeInverted)
// Logger.debug(s"UpdatedProps: ${newPropsWithTs}\n")
// Logger.debug(s"EdgeUpdate: $update\n")
update
}
def fromString(s: String): Option[Edge] = Graph.toEdge(s)
def toEdge(kv: org.hbase.async.KeyValue, param: QueryParam): Option[Edge] = {
val version = kv.timestamp()
val rowKey = EdgeRowKey(kv.key(), 0)
val srcVertexId = rowKey.srcVertexId
val (tgtVertexId, props, op, ts) = rowKey.isInverted match {
case true =>
val qualifier = EdgeQualifierInverted(kv.qualifier(), 0)
val value = EdgeValueInverted(kv.value(), 0)
val kvsMap = value.props.toMap
val ts = kvsMap.get(LabelMeta.timeStampSeq) match {
case None => version
case Some(v) => v.innerVal.longV.get
}
(qualifier.tgtVertexId, kvsMap, value.op, ts)
case false =>
val kvQual = kv.qualifier()
val qualifier = EdgeQualifier(kvQual, 0, kvQual.length)
val value = EdgeValue(kv.value(), 0)
val kvs = qualifier.propsKVs(rowKey.labelWithDir.labelId, rowKey.labelOrderSeq) ::: value.props.toList
val kvsMap = kvs.toMap
val ts = kvsMap.get(LabelMeta.timeStampSeq) match {
case None => version
case Some(v) => v.longV.get
}
val kvsMapWithoutTs = kvsMap.map(kv => (kv._1 -> InnerValWithTs(kv._2, ts)))
(qualifier.tgtVertexId, kvsMapWithoutTs, qualifier.op, ts)
}
// Logger.error(s"toEdge: $rowKey, $tgtVertexId")
val labelMetas = LabelMeta.findAllByLabelId(rowKey.labelWithDir.labelId, useCache = true)
val propsWithDefault = (for (meta <- labelMetas) yield {
props.get(meta.seq) match {
case Some(v) => (meta.seq -> v)
case None => (meta.seq -> InnerValWithTs(meta.defaultInnerVal, minTsVal))
}
}).toMap
// play.api.Logger.debug(s"$rowKey, $tgtVertexId, $props, $op, $ts => ${param.hasFilters}")
/**
* TODO: backward compatibility only. deprecate has field
*/
val edge = Edge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), rowKey.labelWithDir, op, ts, version, props)
// if (param.propsFilters.filter(edge)) Some(edge) else None
val matches =
for {
(k, v) <- param.hasFilters
edgeVal <- propsWithDefault.get(k) if edgeVal.innerVal.value == v.value
} yield (k -> v)
val ret = if (matches.size == param.hasFilters.size && param.where.map(_.filter(edge)).getOrElse(true)) {
// val edge = Edge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), rowKey.labelWithDir, op, ts, version, props)
// Logger.debug(s"fetchedEdge: $edge")
Some(edge)
} else {
None
}
// Logger.debug(s"${cell.getQualifier().toList}, ${ret.map(x => x.toStringRaw)}")
ret
}
def toEdge(kv: org.hbase.async.KeyValue): Edge = {
val version = kv.timestamp()
val rowKey = EdgeRowKey(kv.key(), 0)
val srcVertexId = rowKey.srcVertexId
val (tgtVertexId, props, op, ts) = rowKey.isInverted match {
case true =>
val qualifier = EdgeQualifierInverted(kv.qualifier(), 0)
val value = EdgeValueInverted(kv.value(), 0)
val kvsMap = value.props.toMap
val ts = kvsMap.get(LabelMeta.timeStampSeq) match {
case None => version
case Some(v) => v.innerVal.longV.get
}
(qualifier.tgtVertexId, kvsMap, value.op, ts)
case false =>
val kvQual = kv.qualifier()
val qualifier = EdgeQualifier(kvQual, 0, kvQual.length)
val value = EdgeValue(kv.value(), 0)
val kvs = qualifier.propsKVs(rowKey.labelWithDir.labelId, rowKey.labelOrderSeq) ::: value.props.toList
val kvsMap = kvs.toMap
val ts = kvsMap.get(LabelMeta.timeStampSeq) match {
case None => version
case Some(v) => v.longV.get
}
val kvsMapWithoutTs = kvsMap.map(kv => (kv._1 -> InnerValWithTs(kv._2, ts)))
(qualifier.tgtVertexId, kvsMapWithoutTs, qualifier.op, ts)
}
// Logger.error(s"toEdge: $rowKey, $tgtVertexId")
val labelMetas = LabelMeta.findAllByLabelId(rowKey.labelWithDir.labelId, useCache = true)
val propsWithDefault = (for (meta <- labelMetas) yield {
props.get(meta.seq) match {
case Some(v) => (meta.seq -> v)
case None => (meta.seq -> InnerValWithTs(meta.defaultInnerVal, minTsVal))
}
}).toMap
// play.api.Logger.debug(s"$rowKey, $tgtVertexId, $props, $op, $ts => ${param.hasFilters}")
/**
* TODO: backward compatibility only. deprecate has field
*/
Edge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), rowKey.labelWithDir, op, ts, version, props)
}
}