| package com.daumkakao.s2graph.core |
| |
| import HBaseElement._ |
| import play.api.libs.json._ |
| import org.apache.hadoop.hbase.client.HBaseAdmin |
| import org.apache.hadoop.hbase.HTableDescriptor |
| import org.apache.hadoop.hbase.HBaseConfiguration |
| import org.apache.hadoop.hbase.util.Bytes |
| import org.apache.hadoop.hbase.TableName |
| import org.apache.hadoop.hbase.HColumnDescriptor |
| import org.apache.hadoop.hbase.client.Durability |
| import org.apache.hadoop.hbase.io.compress.Compression |
| import org.apache.hadoop.hbase.regionserver.BloomType |
| import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding |
| |
| /** |
| * orchestrate kafka queue, db meta. |
| */ |
| object Management extends JSONParser { |
| |
| val hardLimit = 10000 |
| val defaultLimit = 100 |
| |
| // val labels = new ConcurrentHashMap[String, Label] |
| /** |
| * source, target, label, orderBy, extra properties |
| * e src tgt label key:value,key:value... key:value,key:value... |
| * v src, key:value,key:value |
| * |
| * talk_friend 10 -> story:friend 10 -> |
| */ |
| def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = { |
| for { |
| old <- Label.findByName(oldLabelName) |
| } { |
| Label.findByName(newLabelName) match { |
| case None => |
| val (indexProps, metaProps) = old.metaPropsInvMap.partition(nameMeta => nameMeta._2.usedInIndex) |
| |
| createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType, |
| old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType, |
| old.isDirected, old.serviceName, |
| indexProps.map(t => t._1 -> innerValToJsValue(t._2.defaultInnerVal)).toSeq, |
| metaProps.map(t => t._1 -> innerValToJsValue(t._2.defaultInnerVal)).toSeq, |
| old.consistencyLevel, hTableName, old.hTableTTL) |
| case Some(_) => |
| } |
| } |
| } |
| /** |
| * label |
| */ |
| /** |
| * create {labelName, labelName_delete, labelName_update} labels |
| */ |
| def createLabel(label: String, |
| srcServiceName: String, |
| srcColumnName: String, |
| srcColumnType: String, |
| tgtServiceName: String, |
| tgtColumnName: String, |
| tgtColumType: String, |
| isDirected: Boolean = true, |
| serviceName: String, |
| indexProps: Seq[(String, JsValue)], |
| props: Seq[(String, JsValue)], |
| consistencyLevel: String, |
| hTableName: Option[String], |
| hTableTTL: Option[Int]): Label = { |
| |
| val idxProps = for ((k, v) <- indexProps; (innerVal, dataType) = toInnerVal(v)) yield (k, innerVal, dataType, true) |
| val metaProps = for ((k, v) <- props; (innerVal, dataType) = toInnerVal(v)) yield (k, innerVal, dataType, false) |
| |
| val indexPropsWithType = |
| for ((k, v) <- indexProps) yield { |
| val (innerVal, dataType) = toInnerVal(v) |
| (k, innerVal, dataType) |
| } |
| var cacheKey = s"serviceName=$srcServiceName" |
| Service.expireCache(cacheKey) |
| val srcService = tryOption(srcServiceName, Service.findByName) |
| cacheKey = s"serviceName=$tgtServiceName" |
| Service.expireCache(cacheKey) |
| val tgtService = tryOption(tgtServiceName, Service.findByName) |
| cacheKey = s"serviceName=$serviceName" |
| Service.expireCache(cacheKey) |
| val service = tryOption(serviceName, Service.findByName) |
| |
| cacheKey = s"label=$label" |
| Label.expireCache(cacheKey) |
| val labelOpt = Label.findByName(label, useCache = false) |
| |
| labelOpt match { |
| case Some(l) => |
| throw new KGraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.") |
| case None => |
| Label.insertAll(label, |
| srcService.id.get, srcColumnName, srcColumnType, |
| tgtService.id.get, tgtColumnName, tgtColumType, |
| isDirected, serviceName, service.id.get, idxProps ++ metaProps, consistencyLevel, hTableName, hTableTTL) |
| Label.expireCache(cacheKey) |
| Label.findByName(label).get |
| } |
| } |
| |
| def addIndex(labelStr: String, orderByKeys: Seq[(String, JsValue)]) = { |
| val label = try { |
| Label.findByName(labelStr).get |
| } catch { |
| case e: Throwable => |
| throw new KGraphExceptions.LabelNotExistException(labelStr) |
| } |
| val labelOrderTypes = |
| for ((k, v) <- orderByKeys; (innerVal, dataType) = toInnerVal(v)) yield { |
| |
| val lblMeta = LabelMeta.findOrInsert(label.id.get, k, innerVal.toString, dataType, true) |
| if (lblMeta.usedInIndex) lblMeta.seq |
| else throw new KGraphExceptions.LabelMetaExistException(s"") |
| } |
| LabelIndex.findOrInsert(label.id.get, labelOrderTypes.toList, "") |
| } |
| |
| def getServiceLable(label: String): Option[Label] = { |
| Label.findByName(label) |
| } |
| |
| /** |
| * |
| */ |
| |
| def toLabelWithDirectionAndOp(label: Label, direction: String): Option[LabelWithDirection] = { |
| for { |
| labelId <- label.id |
| dir = GraphUtil.toDirection(direction) |
| } yield LabelWithDirection(labelId, dir) |
| } |
| |
| def tryOption[A, R](key: A, f: A => Option[R]) = { |
| f(key) match { |
| case None => throw new KGraphExceptions.InternalException(s"$key is not found in DB. create $key first.") |
| case Some(r) => r |
| } |
| } |
| |
| def toEdge(ts: Long, operation: String, srcId: String, tgtId: String, |
| labelStr: String, direction: String = "", props: String): Edge = { |
| |
| val label = tryOption(labelStr, getServiceLable) |
| |
| val src = toInnerVal(srcId, label.srcColumnType) |
| val tgt = toInnerVal(tgtId, label.tgtColumnType) |
| |
| val srcVertex = Vertex(CompositeId(label.srcColumn.id.get, src, true, true), ts) |
| val tgtVertex = Vertex(CompositeId(label.tgtColumn.id.get, tgt, true, true), ts) |
| val dir = if (direction == "") GraphUtil.toDirection(label.direction) else GraphUtil.toDirection(direction) |
| val labelWithDir = LabelWithDirection(label.id.get, dir) |
| val op = tryOption(operation, GraphUtil.toOp) |
| |
| val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj()) |
| val parsedProps = toProps(label, jsObject).toMap |
| val propsWithTs = parsedProps.map(kv => (kv._1 -> InnerValWithTs(kv._2, ts))) ++ Map(LabelMeta.timeStampSeq -> InnerValWithTs(InnerVal.withLong(ts), ts)) |
| Edge(srcVertex, tgtVertex, labelWithDir, op, ts, version = ts, propsWithTs) |
| |
| } |
| |
| def toVertex(ts: Long, operation: String, id: String, serviceName: String, columnName: String, props: String): Vertex = { |
| Service.findByName(serviceName) match { |
| case None => throw new RuntimeException(s"$serviceName does not exist. create service first.") |
| case Some(service) => |
| ServiceColumn.find(service.id.get, columnName) match { |
| case None => throw new RuntimeException(s"$columnName is not exist. create service column first.") |
| case Some(col) => |
| val idVal = toInnerVal(id, col.columnType) |
| val op = tryOption(operation, GraphUtil.toOp) |
| val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj()) |
| val parsedProps = toProps(col, jsObject).toMap |
| Vertex(CompositeId(col.id.get, idVal, isEdge = false, useHash = true), ts, parsedProps, op = op) |
| } |
| } |
| } |
| |
| def toProps(column: ServiceColumn, js: JsObject): Seq[(Byte, InnerVal)] = { |
| |
| val props = for { |
| (k, v) <- js.fields |
| } yield { |
| val colMeta = ColumnMeta.findOrInsert(column.id.get, k) |
| val (innerVal, dataType) = toInnerVal(v) |
| (colMeta.seq, innerVal) |
| } |
| // Logger.debug(s"vertex.ToProps: $column, $js => $props") |
| props |
| |
| } |
| |
| def toProps(label: Label, js: JsObject): Seq[(Byte, InnerVal)] = { |
| |
| val props = for { |
| (k, v) <- js.fields |
| meta <- label.metaPropsInvMap.get(k) |
| // meta <- LabelMeta.findByName(label.id.get, k) |
| // meta = tryOption((label.id.get, k), LabelMeta.findByName) |
| innerVal <- jsValueToInnerVal(v, meta.dataType) |
| } yield { |
| (meta.seq, innerVal) |
| } |
| // Logger.error(s"toProps: $js => $props") |
| props |
| |
| } |
| |
| val idTableName = "id" |
| val cf = "a" |
| val idColName = "id" |
| val regionCnt = 10 |
| |
| def getAdmin(zkAddr: String) = { |
| val conf = HBaseConfiguration.create() |
| conf.set("hbase.zookeeper.quorum", zkAddr) |
| |
| val adm = new HBaseAdmin(conf) |
| adm |
| } |
| def enableTable(zkAddr: String, tableName: String) = { |
| getAdmin(zkAddr).enableTable(tableName) |
| } |
| def disableTable(zkAddr: String, tableName: String) = { |
| getAdmin(zkAddr).disableTable(tableName) |
| } |
| def dropTable(zkAddr: String, tableName: String) = { |
| getAdmin(zkAddr).disableTable(tableName) |
| getAdmin(zkAddr).deleteTable(tableName) |
| } |
| // def deleteEdgesByLabelIds(zkAddr: String, |
| // tableName: String, |
| // labelIds: String = "", |
| // minTs: Long = 0L, |
| // maxTs: Long = Long.MaxValue, |
| // include: Boolean = true) = { |
| // val conf = HBaseConfiguration.create() |
| // val longTimeout = "1200000" |
| // conf.set("hbase.rpc.timeout", longTimeout) |
| // conf.set("hbase.client.operation.timeout", longTimeout) |
| // conf.set("hbase.client.scanner.timeout.period", longTimeout) |
| // conf.set("hbase.zookeeper.quorum", zkAddr) |
| // val conn = HConnectionManager.createConnection(conf) |
| // val table = conn.getTable(tableName.getBytes) |
| // var builder = DeleteLabelsArgument.newBuilder() |
| // val scanner = Scan.newBuilder() |
| // |
| // scanner.setTimeRange(TimeRange.newBuilder().setFrom(minTs).setTo(maxTs)) |
| // /** |
| // * when we clean up all data does not match current database ids |
| // * we will delete row completely |
| // */ |
| // if (!include) scanner.setFilter(ProtobufUtil.toFilter(new FirstKeyOnlyFilter)) |
| // |
| // builder.setScan(scanner) |
| // for (id <- labelIds.split(",")) { |
| // builder.addId(id.toInt) |
| // } |
| // |
| // val argument = builder.build() |
| // |
| // val regionStats = table.coprocessorService(classOf[GraphStatService], null, null, |
| // new Batch.Call[GraphStatService, Long]() { |
| // override def call(counter: GraphStatService): Long = { |
| // val controller: ServerRpcController = new ServerRpcController() |
| // val rpcCallback: BlockingRpcCallback[CountResponse] = new BlockingRpcCallback[CountResponse]() |
| // |
| // if (include) { |
| // counter.cleanUpDeleteLabelsRows(controller, argument, rpcCallback) |
| // } else { |
| // counter.cleanUpDeleteLabelsRowsExclude(controller, argument, rpcCallback) |
| // } |
| // |
| // val response: CountResponse = rpcCallback.get() |
| // if (controller.failedOnException()) throw controller.getFailedOn() |
| // if (response != null && response.hasCount()) { |
| // response.getCount() |
| // } else { |
| // 0L |
| // } |
| // } |
| // }) |
| // |
| // // regionStats.map(kv => Bytes.toString(kv._1) -> kv._2) ++ Map("total" -> regionStats.values().sum) |
| // } |
| def createTable(zkAddr: String, tableName: String, cfs: List[String], regionCnt: Int, ttl: Option[Int]) = { |
| try { |
| val admin = getAdmin(zkAddr) |
| println(admin) |
| if (!admin.tableExists(tableName)) { |
| println("createTable") |
| val desc = new HTableDescriptor(TableName.valueOf(tableName)) |
| desc.setDurability(Durability.ASYNC_WAL) |
| for (cf <- cfs) { |
| val columnDesc = new HColumnDescriptor(cf) |
| .setCompressionType(Compression.Algorithm.GZ) |
| .setBloomFilterType(BloomType.ROW) |
| .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) |
| .setMaxVersions(1) |
| .setTimeToLive(2147483647) |
| .setMinVersions(0) |
| .setBlocksize(32768) |
| .setBlockCacheEnabled(true) |
| if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get) |
| desc.addFamily(columnDesc) |
| } |
| |
| if (regionCnt <= 1) admin.createTable(desc) |
| else admin.createTable(desc, getStartKey(regionCnt), getEndKey(regionCnt), regionCnt) |
| } else { |
| // already exist |
| } |
| } catch { |
| case e: Throwable => println(e) |
| } |
| } |
| // we only use murmur hash to distribute row key. |
| private def getStartKey(regionCount: Int) = { |
| Bytes.toBytes((Int.MaxValue / regionCount)) |
| } |
| |
| private def getEndKey(regionCount: Int) = { |
| Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1))) |
| } |
| |
| |
| } |