blob: 75be84d54f54b34362f25aa97091db60a3bda68e [file] [log] [blame]
package com.daumkakao.s2graph.core
import HBaseElement._
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Delete
import org.apache.hadoop.hbase.client.Mutation
import play.api.libs.json.Json
import scala.collection.mutable.ListBuffer
import org.hbase.async.{DeleteRequest, HBaseRpc, PutRequest, GetRequest}
/**
*
*/
case class Vertex(id: CompositeId,
ts: Long,
props: Map[Byte, InnerVal] = Map.empty[Byte, InnerVal], op: Byte = 0) extends GraphElement {
import GraphConstant._
// import Vertex.{ lastModifiedAtColumn, deletedAtColumn }
lazy val serviceColumn = ServiceColumn.findById(id.colId)
lazy val service = Service.findById(serviceColumn.serviceId)
lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.hTableName)
lazy val rowKey = VertexRowKey(id)
// lazy val defaultProps = Map(defaultColumn -> (DateTime.now().getMillis / 1000).toInt)
lazy val defaultProps = Map(ColumnMeta.lastModifiedAtColumnSeq -> InnerVal.withLong(ts))
lazy val qualifiersWithValues = for ((k, v) <- props ++ defaultProps) yield (VertexQualifier(k), v)
lazy val innerId = id.innerId
/** TODO: make this as configurable */
override lazy val serviceName = service.serviceName
override lazy val isAsync = false
override lazy val queueKey = Seq(ts.toString, serviceName).mkString("|")
override lazy val queuePartitionKey = id.innerId.toString
lazy val propsWithName = for {
(seq, v) <- props
meta <- ColumnMeta.findByIdAndSeq(id.colId, seq)
} yield (meta.name -> v.toString)
// lazy val propsWithName = for {
// (seq, v) <- props
// meta <- ColumnMeta.findByIdAndSeq(id.colId, seq)
// } yield (meta.name -> v.toString)
def buildPuts(): List[Put] = {
// play.api.Logger.error(s"put: $this => $rowKey")
val puts =
for ((q, v) <- qualifiersWithValues) yield {
val put = new Put(rowKey.bytes)
// play.api.Logger.debug(s"${rowKey.bytes.toList}")
/**
* TODO
* now user need to update one by one(can`t update multiple key values).
* if user issue update on vertex with multiple key values then they all have same timestamp version.
*/
// all props have same timestamp version in hbase.
// This
// play.api.Logger.debug(s"VertexBuildPuts: $rowKey, $q")
put.add(vertexCf, q.bytes, ts, v.bytes)
}
puts.toList
}
def buildPutsAsync(): List[PutRequest] = {
val puts =
for ((q, v) <- qualifiersWithValues) yield {
new PutRequest(hbaseTableName.getBytes, rowKey.bytes, vertexCf, q.bytes, v.bytes, ts)
}
puts.toList
}
// def buildPutsAll(): List[Mutation] = {
// op match {
// case d: Byte if d == GraphUtil.operations("delete") => // delete
// buildDelete()
// case _ => // insert/update/increment
// buildPuts()
// }
// }
def buildPutsAll(): List[HBaseRpc] = {
op match {
case d: Byte if d == GraphUtil.operations("delete") => buildDeleteAsync()
case _ => buildPutsAsync()
}
}
def buildDelete(): List[Delete] = {
List(new Delete(rowKey.bytes, ts))
}
def buildDeleteAsync(): List[DeleteRequest] = {
List(new DeleteRequest(hbaseTableName.getBytes, rowKey.bytes, vertexCf, ts))
}
// def buildGet() = {
// val get = new Get(rowKey.bytes)
// // play.api.Logger.error(s"get: $this => $rowKey")
// get.addFamily(vertexCf)
// get
// }
def buildGet() = {
new GetRequest(hbaseTableName.getBytes, rowKey.bytes, vertexCf)
}
def toEdgeVertex() = Vertex(id.updateIsEdge(true), ts, props)
override def toString(): String = {
val (serviceName, columnName) = if (id.isEdge) ("", "") else {
val serviceColumn = ServiceColumn.findById(id.colId)
(serviceColumn.service.serviceName, serviceColumn.columnName)
}
val ls = ListBuffer(ts, GraphUtil.fromOp(op), "v", innerId, serviceName, columnName)
if (!propsWithName.isEmpty) ls += Json.toJson(propsWithName)
ls.mkString("\t")
}
override def hashCode() = {
id.hashCode()
}
override def equals(obj: Any) = {
obj match {
case otherVertex: Vertex =>
id.equals(otherVertex.id)
case _ => false
}
}
def withProps(newProps: Map[Byte, InnerVal]) = Vertex(id, ts, newProps, op)
}
object Vertex {
val emptyVertex = Vertex(CompositeId.emptyCompositeId, System.currentTimeMillis())
def fromString(s: String): Option[Vertex] = Graph.toVertex(s)
def apply(kvs: Seq[org.hbase.async.KeyValue]): Option[Vertex] = {
if (kvs.isEmpty) None
else {
val head = kvs.head
val headBytes = head.key()
val rowKey = VertexRowKey(headBytes, 0)
var maxTs = Long.MinValue
/**
*
* TODO
* Make sure this doens`t violate any MVCC Version.
*/
val props =
for {
kv <- kvs
kvQual = kv.qualifier()
qualifier = VertexQualifier(kvQual, 0, kvQual.length)
value = InnerVal(kv.value(), 0)
ts = kv.timestamp()
} yield {
if (ts > maxTs) maxTs = ts
(qualifier.propKey, value)
}
assert(maxTs != Long.MinValue)
Some(Vertex(rowKey.id, maxTs, props.toMap))
}
}
}