blob: 8a8240e5c188c973182b86bd71f511eee0eea8f2 [file] [log] [blame]
package com.daumkakao.s2graph.core
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HConnection
import org.apache.hadoop.hbase.client.HConnectionManager
import org.apache.hadoop.hbase.client.HTableInterface
import java.util.concurrent.Executors
import java.util.concurrent.ConcurrentHashMap
import org.slf4j.LoggerFactory
import scala.collection.mutable.SynchronizedQueue
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.filter.FilterList
import HBaseElement._
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.Result
import scala.collection.JavaConversions._
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable.HashSet
import GraphUtil._
import org.apache.hadoop.hbase.filter.ColumnPaginationFilter
import scala.concurrent._
import scala.concurrent.duration._
import org.apache.hadoop.hbase.client.HTable
import scala.collection.mutable.ListBuffer
import scala.annotation.tailrec
import scala.collection.mutable.HashMap
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter
import org.apache.hadoop.hbase.Cell
import org.apache.hadoop.hbase.client.Delete
import Management._
import org.apache.hadoop.hbase.client.Row
import play.api.libs.json.Json
import KGraphExceptions._
import org.apache.hadoop.hbase.client.Mutation
import play.libs.Akka
import com.typesafe.config.Config
import scala.reflect.ClassTag
import org.hbase.async._
import com.stumbleupon.async.Deferred
import com.stumbleupon.async.Callback
import java.util.ArrayList
import scala.util.{Try, Success, Failure}
object GraphConstant {
val vertexCf = "v".getBytes()
val edgeCf = "e".getBytes()
val updateCf = "u".getBytes()
val ttsForActivity = 60 * 60 * 24 * 30
val delimiter = "|"
val seperator = ":"
val writeBufferSize = 1024 * 1024 * 2
val maxValidEdgeListSize = 10000
// implicit val ex = play.api.libs.concurrent.Execution.Implicits.defaultContext
val queryLogger = play.api.Logger("query")
}
object GraphConnection {
val logger = Graph.logger
lazy val tablePool = Executors.newFixedThreadPool(1)
lazy val connectionPool = Executors.newFixedThreadPool(1)
val defaultConfigs = Map(
"hbase.zookeeper.quorum" -> "localhost",
"hbase.table.name" -> "s2graph",
"phase" -> "dev")
var config: Config = null
def getOrElse[T: ClassTag](conf: com.typesafe.config.Config)(key: String, default: T): T = {
if (conf.hasPath(key)) (default match {
case _: String => conf.getString(key)
case _: Int | _: Integer => conf.getInt(key)
case _: Float | _: Double => conf.getDouble(key)
case _: Boolean => conf.getBoolean(key)
case _ => default
}).asInstanceOf[T]
else default
}
/**
* requred: hbase.zookeeper.quorum
* optional: all hbase. prefix configurations.
*/
def toHBaseConfig(config: com.typesafe.config.Config) = {
val configVals = for ((k, v) <- defaultConfigs) yield {
val currentVal = getOrElse(config)(k, v)
logger.debug(s"$k -> $currentVal")
k -> currentVal
}
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", configVals("hbase.zookeeper.quorum"))
for (entry <- config.entrySet() if entry.getKey().startsWith("hbase.")) {
val value = entry.getValue().unwrapped().toString
conf.set(entry.getKey(), value)
}
conf
}
def apply(config: Config) = {
this.config = config
val hbaseConfig = toHBaseConfig(config)
(hbaseConfig -> HConnectionManager.createConnection(hbaseConfig))
}
}
object Graph {
import GraphConstant._
import GraphConnection._
val logger = Edge.logger
val conns = scala.collection.mutable.Map[String, HConnection]()
val clients = scala.collection.mutable.Map[String, HBaseClient]()
val emptyKVs = new ArrayList[KeyValue]()
val emptyKVlist = new ArrayList[ArrayList[KeyValue]]();
val emptyEdgeList = new ArrayList[ArrayList[(Edge, Double, QueryParam)]]
val emptyEdges = new ArrayList[(Edge, Double, QueryParam)]
var executionContext: ExecutionContext = null
var config: com.typesafe.config.Config = null
var hbaseConfig: org.apache.hadoop.conf.Configuration = null
var storageExceptionCount = 0L
var singleGetTimeout = 1000 millis
// implicit val ex = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))
def apply(config: com.typesafe.config.Config)(implicit ex: ExecutionContext) = {
this.config = config
val (hbaseConfig, conn) = GraphConnection.apply(config)
this.hbaseConfig = hbaseConfig
Model.apply(config)
this.executionContext = ex
this.singleGetTimeout = getOrElse(config)("hbase.client.operation.timeout", 1000 millis)
val zkQuorum = hbaseConfig.get("hbase.zookeeper.quorum")
conns += (zkQuorum -> conn)
// clients += (zkQuorum -> new HBaseClient(zkQuorum, "/hbase", Executors.newCachedThreadPool(), 8))
clients += (zkQuorum -> new HBaseClient(zkQuorum))
}
def getClient(zkQuorum: String) = {
clients.get(zkQuorum) match {
case None =>
val client = new HBaseClient(zkQuorum)
clients += (zkQuorum -> client)
client
// throw new RuntimeException(s"connection to $zkQuorum is not established.")
case Some(c) => c
}
}
def getConn(zkQuorum: String) = {
conns.get(zkQuorum) match {
case None =>
val conn = HConnectionManager.createConnection(this.hbaseConfig)
conns += (zkQuorum -> conn)
conn
// throw new RuntimeException(s"connection to $zkQuorum is not established.")
case Some(c) => c
}
}
//
// def withWriteTable[T](connName: String, tName: String)(op: HTableInterface => T)(fallback: => T): T = {
// // Logger.debug(s"withWriteTable: $connName, $tName")
// try {
// val conn = getConn(connName)
// if (!conn.isMasterRunning()) throw new RuntimeException(s"master is not running. $connName")
// val table = conn.getTable(tName, tablePool)
//
// table.setAutoFlush(false, false)
// table.setWriteBufferSize(writeBufferSize)
// try {
// op(table)
// } catch {
// case e: Exception =>
// Logger.error(s"Write Operation to table ($connName), ($tName) is failed: ${e.getMessage}", e)
// fallback
// throw e
// } finally {
// table.close()
// }
// } catch {
// case e: Exception =>
// Logger.error(s"withWriteTable ($connName) ($tName) is failed: ${e.getMessage}", e)
// fallback
// throw e
// }
// }
//
// def withReadTable[T](connName: String, tName: String)(op: HTableInterface => T)(fallback: => T): T = {
// // queryLogger.debug(s"withReadTable: $connName, $tName")
// try {
// val conn = getConn(connName)
// if (!conn.isMasterRunning()) throw new RuntimeException(s"master is not running. $connName")
// val table = conn.getTable(tName, tablePool)
//
// table.setAutoFlush(false, false)
// table.setWriteBufferSize(writeBufferSize)
// try {
// op(table)
// } catch {
// case e: Exception =>
// queryLogger.error(s"Read Operation to table ($connName) ($tName) is failed: ${e.getMessage}", e)
// fallback
// } finally {
// table.close()
// }
// } catch {
// case e: Exception =>
// queryLogger.error(s"withReadTable ($connName) ($tName) is failed: ${e.getMessage}", e)
// fallback
// }
// }
//
// // to catch exception on htable.get
// def htableGet(table: HTableInterface)(get: Get) = {
// try {
// table.get(get)
// } catch {
// case e: Throwable =>
// // if hbase is throw any exception, simply return empty result
// queryLogger.error(s"hbaseGet: $get, $e", e)
// new Result()
// }
// }
// def withTimeout[T](op: => Future[T], fallback: => T)(implicit timeout: Duration): Future[T] = {
// // val timeoutFuture = play.api.libs.concurrent.Promise.timeout(fallback, timeout)
// // Future.firstCompletedOf(Seq(op, timeoutFuture))
// TimeoutFuture(op, fallback)(executionContext, timeout)
// // val timeoutFuture = akka.pattern.after(timeout.toMillis millis, using = Akka.system.scheduler) { Future { fallback } }
// // Future.firstCompletedOf(Seq(op, timeoutFuture))
// }
def withTimeout[T](zkQuorum: String, op: => Future[T], fallback: => T)(implicit timeout: Duration): Future[T] = {
try {
val client = getClient(zkQuorum)
// TimeoutFuture(op, fallback)(this.executionContext, timeout)
op
} catch {
case e: Throwable =>
queryLogger.error(s"withTimeout: $e", e)
Future { fallback }(this.executionContext)
}
}
// def save(zkQuorum: String, tableName: String, mutations: Seq[Mutation]): Unit = {
// // Logger.debug(s"save to $zkQuorum, $tableName, ${mutations.size} mutations.")
// if (mutations.isEmpty) {}
// else {
// withWriteTable[Unit](zkQuorum, tableName) { table =>
// val ret: Array[Object] = Array.fill(mutations.size)(null)
// table.batch(mutations, ret)
// // TODO: retry
// for (r <- ret if r == null) {
// Logger.error(s"save failed after batch.")
// }
// } {
// Logger.error(s"save mutation failed.")
// }
// }
// }
def defferedToFuture[A](d: Deferred[A])(fallback: A): Future[A] = {
val promise = Promise[A]
d.addBoth(new Callback[Unit, A] {
def call(arg: A) = arg match {
case e: Throwable =>
queryLogger.error(s"deferred return throwable: $e", e)
promise.success(fallback)
case _ => promise.success(arg)
}
})
promise.future
}
def deferredToFutureWithoutFallback[T](d: Deferred[T]) = {
val promise = Promise[T]
d.addBoth(new Callback[Unit, T]{
def call(arg: T) = arg match {
case e: Throwable =>
queryLogger.error(s"deferred return throwable: $e", e)
promise.failure(e)
case _ => promise.success(arg)
}
})
promise.future
}
def deferredCallbackWithFallback[T, R](d: Deferred[T])(f: T => R, fallback: => R) = {
d.addCallback(new Callback[R, T] {
def call(args: T): R = {
f(args)
}
}).addErrback(new Callback[R, Exception] {
def call(e: Exception): R = {
queryLogger.error(s"Exception on deferred: $e", e)
fallback
}
})
}
def writeAsync(zkQuorum: String, rpcs: Seq[HBaseRpc]) = {
if (rpcs.isEmpty) {}
else {
try {
val client = getClient(zkQuorum)
val futures = rpcs.map { rpc =>
val deferred = rpc match {
case d: DeleteRequest => client.delete(d)
case p: PutRequest => client.put(p)
// case i: AtomicIncrementRequest => client.atomicIncrement(i)
}
deferredToFutureWithoutFallback(deferred)
}
// Future.sequence(futures)
} catch {
case e: Throwable =>
queryLogger.error(s"writeAsync failed. $e", e)
}
}
}
/**
* Edge
*/
// def mutateEdge(edge: Edge): Unit = {
// save(edge.label.hbaseZkAddr, edge.label.hbaseTableName, edge.buildPutsAll())
// }
def mutateEdge(edge: Edge) = {
writeAsync(edge.label.hbaseZkAddr, edge.buildPutsAll())
}
// def mutateEdges(edges: Seq[Edge], mutateInPlace: Boolean = false): Unit = {
//
// val edgesPerTable = edges.groupBy { edge => (edge.label.hbaseZkAddr, edge.label.hbaseTableName) }
// for (((zkQuorum, tableName), edges) <- edgesPerTable) {
// /**
// * delete/update/increment can`t be batched.
// */
// val (batches, others) = edges.partition(e => e.canBeBatched)
// save(zkQuorum, tableName, batches.flatMap(_.buildPutsAll))
// others.foreach(other => save(zkQuorum, tableName, other.buildPutsAll))
// }
// val verticesPerTable = edges.flatMap { edge => List(edge.srcForVertex, edge.tgtForVertex) }.groupBy { v => (v.hbaseZkAddr, v.hbaseTableName) }
// for (((zkQuorum, tableName), vertices) <- verticesPerTable) {
// save(zkQuorum, tableName, vertices.flatMap(v => v.buildPuts))
// }
// }
def mutateEdges(edges: Seq[Edge]) = {
val edgesPerTable = edges.groupBy { edge => edge.label.hbaseZkAddr }
for ((zkQuorum, edges) <- edgesPerTable) {
val (batches, others) = edges.partition(e => e.canBeBatched)
writeAsync(zkQuorum, batches.flatMap(_.buildPutsAll))
others.foreach(other => writeAsync(zkQuorum, other.buildPutsAll))
}
val verticesPerTable = edges.flatMap { edge => List(edge.srcForVertex, edge.tgtForVertex) }.groupBy { v => v.hbaseZkAddr }
for ((zkQuorum, vertices) <- verticesPerTable) {
writeAsync(zkQuorum, vertices.flatMap(v => v.buildPutsAsync))
}
}
//select
/**
*
*/
def getEdgesAsync(q: Query): Future[Seq[Iterable[(Edge, Double)]]] = {
implicit val ex = this.executionContext
// not sure this is right. make sure refactor this after.
try {
if (q.steps.isEmpty) {
// TODO: this should be get vertex query.
Future { q.vertices.map(v => List.empty[(Edge, Double)]) }
} else {
val stepLen = q.steps.length
var step = q.steps.head
var seedEdgesFuture = getEdgesAsyncWithRankForFistStep(q.vertices, q, 0)
for (i <- (1 until stepLen)) {
seedEdgesFuture = getEdgesAsyncWithRank(seedEdgesFuture, q, i)
}
seedEdgesFuture
}
} catch {
case e: Throwable =>
queryLogger.error(s"getEdgesAsync: $e", e)
Future { q.vertices.map(v => List.empty[(Edge, Double)]) }
}
}
//only for testcase.
def getEdgesSync(q: Query): Seq[Iterable[(Edge, Double)]] = {
Await.result(getEdgesAsync(q), 10 seconds)
}
//only for testcase.
// def getEdgeSync(srcVertex: Vertex, tgtVertex: Vertex, label: Label, dir: Int) = {
// val rowKey = EdgeRowKey(srcVertex.id, LabelWithDirection(label.id.get, dir), LabelIndex.defaultSeq, isInverted = true)
// val qualifier = EdgeQualifierInverted(tgtVertex.id)
// val get = new Get(rowKey.bytes)
// get.addColumn(edgeCf, qualifier.bytes)
// withReadTable(label.hbaseZkAddr, label.hbaseTableName) { table =>
// // table.get(get)
// Edge.toEdges(htableGet(table)(get))
// } {
// List.empty[Edge]
// }
// }
def getEdge(srcVertex: Vertex, tgtVertex: Vertex, label: Label, dir: Int): Future[Iterable[Edge]] = {
implicit val ex = this.executionContext
val rowKey = EdgeRowKey(srcVertex.id, LabelWithDirection(label.id.get, dir), label.defaultIndex.get.seq, isInverted = true)
val qualifier = EdgeQualifierInverted(tgtVertex.id)
val client = getClient(label.hbaseZkAddr)
val getRequest = new GetRequest(label.hbaseTableName.getBytes(), rowKey.bytes, edgeCf, qualifier.bytes)
defferedToFuture(client.get(getRequest))(emptyKVs).map { kvs =>
for {
kv <- kvs
edge <- Edge.toEdge(kv, QueryParam(LabelWithDirection(label.id.get, dir.toByte)))
} yield edge
}
}
def buildGetRequests(srcVertices: Seq[Vertex], params: List[QueryParam]): Seq[Iterable[(GetRequest, QueryParam)]] = {
srcVertices.map { vertex =>
params.map { param =>
(param.buildGetRequest(vertex), param)
}
}
}
// def buildGets(srcVertices: Seq[Vertex], params: List[QueryParam]): Seq[Iterable[(Get, QueryParam)]] = {
// srcVertices.map { vertex =>
// params.map { param =>
// (param.buildGet(vertex), param)
// }
// }
// }
def singleGet(table: Array[Byte], rowKey: Array[Byte], cf: Array[Byte], offset: Int, limit: Int,
minTs: Long, maxTs: Long,
maxAttempt: Int, rpcTimeoutInMillis: Int,
columnRangeFilter: ColumnRangeFilter) = {
val get = new GetRequest(table, rowKey, cf)
get.maxVersions(1)
get.setFailfast(true)
get.setMaxResultsPerColumnFamily(limit)
get.setRowOffsetPerColumnFamily(offset)
get.setMinTimestamp(minTs)
get.setMaxTimestamp(maxTs)
get.setMaxAttempt(maxAttempt.toByte)
get.setRpcTimeout(rpcTimeoutInMillis)
if (columnRangeFilter != null) get.filter(columnRangeFilter)
get
}
def convertEdge(edge: Edge, labelOutputFields: Map[Int, Byte]): Option[Edge] = {
labelOutputFields.get(edge.labelWithDir.labelId) match {
case None => Some(edge)
case Some(outputField) =>
edge.propsWithTs.get(outputField) match {
case None => None
case Some(outputFieldVal) =>
Some(edge.updateTgtVertex(outputFieldVal.innerVal))
}
}
}
def filterEdges(edgesFuture: Future[ArrayList[ArrayList[(Edge, Double, QueryParam)]]],
q: Query,
stepIdx: Int,
alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty[(LabelWithDirection, Vertex), Boolean]): Future[Seq[Iterable[(Edge, Double)]]] = {
implicit val ex = Graph.executionContext
edgesFuture.map { edgesByVertices =>
val step = q.steps(stepIdx)
val labelOutputFields = step.queryParams.map { qParam => qParam.outputField.map(outputField => qParam.labelWithDir.labelId -> outputField) }.flatten.toMap
val excludeLabelWithDirs = (for (queryParam <- step.queryParams if queryParam.exclude) yield queryParam.labelWithDir).toSet
val includeLabelWithDirOpt = (for (queryParam <- step.queryParams if queryParam.include) yield queryParam.labelWithDir).toSet.headOption
val seen = new HashMap[(Vertex, LabelWithDirection, Vertex), Double]
// val seen = new HashSet[(LabelWithDirection, Vertex)]
// val excludeFromTos = new HashSet[(CompositeId, CompositeId)]
// val includeFromTos = new HashSet[(CompositeId, CompositeId)]
val excludeFromTos = new HashSet[(Vertex, Vertex)]
val includeFromTos = new HashSet[(Vertex, Vertex)]
val hasIncludeLabel = includeLabelWithDirOpt.isDefined
// if (hasIncludeLabel) {
for {
edgesWithScore <- edgesByVertices
(edge, score, queryParam) <- edgesWithScore
} {
// Logger.debug(s"${edge.toStringRaw}")
if (hasIncludeLabel && edge.labelWithDir == includeLabelWithDirOpt.get) {
// includeFromTos += (edge.srcVertex.id -> edge.tgtVertex.id)
includeFromTos += (edge.srcVertex -> edge.tgtVertex)
}
if (excludeLabelWithDirs.contains(edge.labelWithDir)) {
// excludeFromTos += (edge.srcVertex.id -> edge.tgtVertex.id)
excludeFromTos += (edge.srcVertex -> edge.tgtVertex)
}
}
// }
// for {
// edgesWithScore <- edgesByVertices
// (edge, score) <- edgesWithScore if excludeLabelWithDirs.contains(edge.labelWithDir)
// } {
// excludeFromTos += (edge.srcVertex -> edge.tgtVertex)
// }
// Logger.debug(s"${excludeFromTos.mkString("\n")}")
// Logger.debug(s"$includeFromTos")
val convertedEdges = for {
edgesWithScore <- edgesByVertices
} yield {
for {
// (edge, score) <- edgesWithScore if !excludeFromTos.contains((edge.srcVertex.id -> edge.tgtVertex.id))
// if (!hasIncludeLabel || includeFromTos.contains((edge.srcVertex.id -> edge.tgtVertex.id)))
(edge, score, queryParam) <- edgesWithScore
fromTo = (edge.srcVertex -> edge.tgtVertex)
if !excludeFromTos.contains(fromTo)
if (!hasIncludeLabel || includeFromTos.contains(fromTo))
convertedEdge <- convertEdge(edge, labelOutputFields)
key = (convertedEdge.labelWithDir, convertedEdge.tgtVertex)
// if !seen.contains(key)
if filterDuplicates(seen, edge, score, queryParam)
if !(q.removeCycle && alreadyVisited.contains(key))
} yield {
// seen += key
(convertedEdge, score)
}
}
for {
edgesWithScore <- convertedEdges
} yield {
for {
(edge, score) <- edgesWithScore
key = (edge.srcVertex, edge.labelWithDir, edge.tgtVertex)
aggregatedScore = seen.getOrElse(key, score)
} yield {
(edge, aggregatedScore)
}
}
}
}
private def filterDuplicates(seen: HashMap[(Vertex, LabelWithDirection, Vertex), Double], edge: Edge, score: Double, queryParam: QueryParam) = {
val key = (edge.srcVertex, edge.labelWithDir, edge.tgtVertex)
val newScore = queryParam.duplicatePolicy match {
case Query.DuplicatePolicy.CountSum => 1.0
case _ => score
}
seen.get(key) match {
case None =>
seen += (key -> newScore)
true
case Some(oldScore) =>
queryParam.duplicatePolicy match {
case Query.DuplicatePolicy.First =>
// use first occurrence`s score
false
case Query.DuplicatePolicy.Raw =>
// TODO: assumes all duplicate vertices will have same score
seen += (key -> newScore)
true
case _ =>
// aggregate score
seen += (key -> (oldScore + newScore))
false
}
}
}
def getEdgesAsyncWithRankForFistStep(srcVertices: Seq[Vertex], q: Query, stepIdx: Int): Future[Seq[Iterable[(Edge, Double)]]] = {
val step = q.steps(stepIdx)
val uniqSrcVertices = srcVertices.groupBy(v => v.id).map { kv => (kv._2.head, 1.0) }
getEdgesAsyncWithRank(uniqSrcVertices.toSeq, q, stepIdx)
}
private def getEdgesAsyncWithRank(prevEdges: Seq[(Edge, Double)], q: Query, stepIdx: Int): Future[Seq[Iterable[(Edge, Double)]]] = {
val step = q.steps(stepIdx)
val alreadyVisited = prevEdges.map {
case (edge, score) =>
(edge.labelWithDir, if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex) -> true
}.toMap
val srcVertices = prevEdges.map { case (edge, score) => (edge.tgtVertex -> score) }
// val getsAll = buildGets(srcVertices.map(_._1), step.queryParams).zip(srcVertices.map(_._2))
val getsAll = buildGetRequests(srcVertices.map(_._1), step.queryParams).zip(srcVertices.map(_._2))
implicit val ex = executionContext
val deffered = getsAll.flatMap { //by verticies
case (getsWithQueryParams, prevScore) =>
getsWithQueryParams.map { //by labels
case (get, queryParam) =>
try {
val client = getClient(queryParam.label.hbaseZkAddr)
deferredCallbackWithFallback(client.get(get))({ kvs =>
val edges = for {
kv <- kvs
edge <- Edge.toEdge(kv, queryParam)
} yield {
(edge, edge.rank(queryParam.rank) * prevScore, queryParam)
}
new ArrayList(edges)
}, emptyEdges)
} catch {
case e @ (_: Throwable | _: Exception) =>
queryLogger.error(s"Exception: $e", e)
Deferred.fromResult(emptyEdges)
}
}
}
val grouped = Deferred.group(deffered)
filterEdges(defferedToFuture(grouped)(emptyEdgeList), q, stepIdx, alreadyVisited)
}
private def getEdgesAsyncWithRank(srcVertices: Seq[(Vertex, Double)], q: Query, stepIdx: Int): Future[Seq[Iterable[(Edge, Double)]]] = {
val step = q.steps(stepIdx)
// val getsAll = buildGets(srcVertices.map(_._1), step.queryParams).zip(srcVertices.map(_._2))
val getsAll = buildGetRequests(srcVertices.map(_._1), step.queryParams).zip(srcVertices.map(_._2))
implicit val ex = executionContext
val deffered = getsAll.flatMap { //by verticies
case (getsWithQueryParams, prevScore) =>
getsWithQueryParams.map { //by labels
case (get, queryParam) =>
try {
val client = getClient(queryParam.label.hbaseZkAddr)
deferredCallbackWithFallback(client.get(get))({ kvs =>
val edges = for {
kv <- kvs
edge <- Edge.toEdge(kv, queryParam)
} yield {
(edge, edge.rank(queryParam.rank) * prevScore, queryParam)
}
new ArrayList(edges)
}, emptyEdges)
} catch {
case e @ (_: Throwable | _: Exception) =>
queryLogger.error(s"Exception: $e", e)
Deferred.fromResult(emptyEdges)
}
}
}
val grouped = Deferred.group(deffered)
filterEdges(defferedToFuture(grouped)(emptyEdgeList), q, stepIdx)
}
def getEdgesAsyncWithRank(srcEdgesFuture: Future[Seq[Iterable[(Edge, Double)]]], q: Query, stepIdx: Int): Future[Seq[Iterable[(Edge, Double)]]] = {
implicit val ex = executionContext
for {
srcEdges <- srcEdgesFuture
edgesWithScore = srcEdges.flatten
ret <- getEdgesAsyncWithRank(edgesWithScore, q, stepIdx)
// verticeWithRanks = edgesWithScore.map(t => (t._1.tgtVertex, t._2)).toSeq
// ret <- getEdgesAsyncWithRank(verticeWithRanks, step)
} yield {
ret
}
}
def getVerticesAsync(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
// TODO: vertex needs meta for hbase table.
// play.api.Logger.error(s"$vertices")
implicit val ex = executionContext
val futures = vertices.map { vertex =>
withTimeout[Option[Vertex]](vertex.hbaseZkAddr, {
// val get = vertex.buildGet
val client = getClient(vertex.hbaseZkAddr)
// val get = vertex.buildGetRequest()
val get = vertex.buildGet
defferedToFuture(client.get(get))(emptyKVs).map { kvs =>
Vertex(kvs)
}
// Logger.error(s"$get")
}, { None })(singleGetTimeout)
}
Future.sequence(futures).map { result => result.toList.flatten }
}
/**
* Vertex
*/
// def mutateVertex(vertex: Vertex) = {
// mutateVertices(List(vertex))
// }
def mutateVertex(vertex: Vertex) = {
mutateVertices(List(vertex))
}
// def mutateVertices(vertices: Seq[Vertex]) = {
// for (((zkQuorum, tableName, op), vertices) <- vertices.groupBy(v => (v.hbaseZkAddr, v.hbaseTableName, v.op))) {
// if (op == GraphUtil.operations("delete") || op == GraphUtil.operations("deleteAll")) deleteVertexAll(vertices)
// else save(zkQuorum, tableName, vertices.flatMap(v => v.buildPutsAll()))
// }
// }
def mutateVertices(vertices: Seq[Vertex]) = {
for (((zkQuorum, op), vertices) <- vertices.groupBy(v => (v.hbaseZkAddr, v.op))) {
if (op == GraphUtil.operations("delete") || op == GraphUtil.operations("deleteAll")) deleteVertexAll(vertices)
else writeAsync(zkQuorum, vertices.flatMap(v => v.buildPutsAll()))
}
}
// delete only vertices
def deleteVertices(vertices: Seq[Vertex]) = {
for ((zkQuorum, vs) <- vertices.groupBy(v => v.hbaseZkAddr)) {
writeAsync(zkQuorum, vs.flatMap(v => v.buildDeleteAsync()))
}
}
/**
* O(E), maynot feasible
*/
def deleteVertexAll(vertices: Seq[Vertex]): Unit = {
for {
vertex <- vertices
label <- (Label.findBySrcColumnId(vertex.id.colId) ++ Label.findByTgtColumnId(vertex.id.colId)).groupBy(_.id.get).map { _._2.head }
} {
deleteVertexAllAsync(vertex.toEdgeVertex, label)
}
deleteVertices(vertices)
}
private def deleteVertexAllAsync(srcVertex: Vertex, label: Label): Future[Boolean] = {
implicit val ex = Graph.executionContext
val qParams = for (dir <- List(0, 1)) yield {
val labelWithDir = LabelWithDirection(label.id.get, dir)
QueryParam(labelWithDir).limit(0, maxValidEdgeListSize * 5)
}
val step = Step(qParams)
val q = Query(List(srcVertex), List(step), true)
val seen = new HashMap[(CompositeId, LabelWithDirection), Boolean]
for {
edgesByVertex <- getEdgesAsync(q)
} yield {
val fetchedEdges = for {
edges <- edgesByVertex
(edge, score) <- edges if edge.ts <= srcVertex.ts && !seen.containsKey((edge.tgtVertex.id, edge.labelWithDir))
} yield {
val labelWithDir = if (label.isDirected) edge.labelWithDir.updateDir(2) else edge.labelWithDir
val edgeToDelete = Edge(edge.srcVertex, edge.tgtVertex, labelWithDir, GraphUtil.operations("delete"), srcVertex.ts, edge.version + Edge.incrementVersion, edge.propsWithTs)
seen += ((edgeToDelete.tgtVertex.id, edgeToDelete.labelWithDir) -> true)
// reverse or not? => reverse.
// delete edge or real delete operation? => ?
// play.api.Logger.debug(s"EdgeToDelete: $edgeToDelete")
// (edge.label.hbaseTableName, edgeToDelete.relatedEdges.flatMap(e => e.buildPutsAll()))
edge
}
for ((zkQuorum, edges) <- fetchedEdges.groupBy(e => e.label.hbaseZkAddr)) {
writeAsync(zkQuorum, edges.flatMap(_.buildPutsAll))
}
true
}
}
// select
def getVertex(vertex: Vertex): Future[Option[Vertex]] = {
implicit val ex = executionContext
val client = getClient(vertex.hbaseZkAddr)
defferedToFuture(client.get(vertex.buildGet))(emptyKVs).map { kvs =>
Vertex(kvs)
}
}
/**
* Bulk
*/
/**
* when how from to what direction (meta info key:value)
* ex) timestamp insert shon sol talk_friend directed/undirected propperties
* ex) timestamp insert shon talk_user_id propperties
*
*/
def toGraphElement(s: String): Option[GraphElement] = {
val parts = GraphUtil.split(s)
try {
val logType = parts(2)
val element = if (logType == "edge" | logType == "e") {
toEdge(parts)
} else if (logType == "vertex" | logType == "v") {
toVertex(parts)
} else {
throw new KGraphExceptions.JsonParseException("log type is not exist in log.")
}
element
} catch {
case e: Throwable =>
logger.error(s"toGraphElement: $s => $e", e)
None
}
}
def bulkMutates(elements: Iterable[GraphElement], mutateInPlace: Boolean = false) = {
val vertices = new ListBuffer[Vertex]
val edges = new ListBuffer[Edge]
for (e <- elements) {
e match {
case edge: Edge => edges += edge
case vertex: Vertex => vertices += vertex
case _ => throw new Exception("GraphElement should be either vertex or edge.")
}
}
mutateVertices(vertices)
mutateEdges(edges)
}
def toVertex(s: String): Option[Vertex] = {
toVertex(GraphUtil.split(s))
}
def toEdge(s: String): Option[Edge] = {
toEdge(GraphUtil.split(s))
}
//"1418342849000\tu\te\t3286249\t71770\ttalk_friend\t{\"is_hidden\":false}"
//{"from":1,"to":101,"label":"graph_test","props":{"time":-1, "weight":10},"timestamp":1417616431},
def toEdge(parts: Array[String]): Option[Edge] = {
try {
val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
val props = if (parts.length >= 7) parts(6) else "{}"
// use db field is_directed.
val direction = ""
val edge = Management.toEdge(ts.toLong, operation, srcId, tgtId, label, direction, props)
// Logger.debug(s"toEdge: $edge")
Some(edge)
} catch {
case e: Throwable =>
logger.error(s"toEdge: $e", e)
throw e
}
}
//"1418342850000\ti\tv\t168756793\ttalk_user_id\t{\"country_iso\":\"KR\"}"
def toVertex(parts: Array[String]): Option[Vertex] = {
try {
val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
val props = if (parts.length >= 7) parts(6) else "{}"
Some(Management.toVertex(ts.toLong, operation, srcId, serviceName, colName, props))
} catch {
case e: Throwable =>
logger.error(s"toVertex: $e", e)
throw e
}
}
}