blob: ad326c64c5db0bbb6080755a05917308ab3ddd53 [file] [log] [blame]
package com.daumkakao.s2graph.core
import play.api.libs.json.{ JsValue, Json }
import scalikejdbc._
object Label extends LocalCache[Label] {
val maxHBaseTableNames = 2
def apply(rs: WrappedResultSet): Label = {
Label(Some(rs.int("id")), rs.string("label"),
rs.int("src_service_id"), rs.string("src_column_name"), rs.string("src_column_type"),
rs.int("tgt_service_id"), rs.string("tgt_column_name"), rs.string("tgt_column_type"),
rs.boolean("is_directed"), rs.string("service_name"), rs.int("service_id"), rs.string("consistency_level"),
rs.string("hbase_table_name"), rs.intOpt("hbase_table_ttl"))
}
def findByName(labelUseCache: (String, Boolean)): Option[Label] = {
val (label, useCache) = labelUseCache
findByName(label, useCache)
}
def findByName(label: String, useCache: Boolean = true): Option[Label] = {
val cacheKey = s"label=$label"
if (useCache) {
withCache(cacheKey)(
sql"""
select *
from labels
where label = ${label}"""
.map { rs => Label(rs) }.single.apply())
} else {
sql"""
select *
from labels
where label = ${label}"""
.map { rs => Label(rs) }.single.apply()
}
}
def insert(label: String,
srcServiceId: Int,
srcColumnName: String,
srcColumnType: String,
tgtServiceId: Int,
tgtColumnName: String,
tgtColumnType: String,
isDirected: Boolean,
serviceName: String,
serviceId: Int,
consistencyLevel: String,
hTableName: String,
hTableTTL: Option[Int]) = {
sql"""
insert into labels(label,
src_service_id, src_column_name, src_column_type,
tgt_service_id, tgt_column_name, tgt_column_type,
is_directed, service_name, service_id, consistency_level, hbase_table_name, hbase_table_ttl)
values (${label},
${srcServiceId}, ${srcColumnName}, ${srcColumnType},
${tgtServiceId}, ${tgtColumnName}, ${tgtColumnType},
${isDirected}, ${serviceName}, ${serviceId}, ${consistencyLevel}, ${hTableName}, ${hTableTTL})
"""
.updateAndReturnGeneratedKey.apply()
}
def findById(id: Int): Label = {
val cacheKey = s"id=$id"
withCache(cacheKey)(sql"""
select *
from labels
where id = ${id}"""
.map { rs => Label(rs) }.single.apply()).get
}
def findByTgtColumnId(columnId: Int): List[Label] = {
val col = ServiceColumn.findById(columnId)
sql"""
select *
from labels
where tgt_column_name = ${col.columnName}
""".map { rs => Label(rs) }.list().apply()
}
def findBySrcColumnId(columnId: Int): List[Label] = {
val col = ServiceColumn.findById(columnId)
sql"""
select *
from labels
where src_column_name = ${col.columnName}
""".map { rs => Label(rs) }.list().apply()
}
def findBySrcServiceId(serviceId: Int): List[Label] = {
sql"""select * from labels where src_service_id = ${serviceId}""".map { rs => Label(rs) }.list().apply
}
def findByTgtServiceId(serviceId: Int): List[Label] = {
sql"""select * from labels where tgt_service_id = ${serviceId}""".map { rs => Label(rs) }.list().apply
}
def insertAll(label: String,
srcServiceId: Int,
srcColumnName: String,
srcColumnType: String,
tgtServiceId: Int,
tgtColumnName: String,
tgtColumnType: String,
isDirected: Boolean = true,
serviceName: String,
serviceId: Int,
props: Seq[(String, Any, String, Boolean)] = Seq.empty[(String, Any, String, Boolean)],
consistencyLevel: String,
hTableName: Option[String],
hTableTTL: Option[Int]) = {
// val ls = List(label, srcServiceId, srcColumnName, srcColumnType, tgtServiceId, tgtColumnName, tgtColumnType, isDirected
// , serviceName, serviceId, props.toString, consistencyLevel, hTableName)
// Logger.error(s"insertAll: $ls")
val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType))
val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType))
val service = Service.findById(serviceId)
// require(service.id.get == srcServiceId || service.id.get == tgtServiceId)
val createdId = insert(label, srcServiceId, srcColumnName, srcColumnType,
tgtServiceId, tgtColumnName, tgtColumnType, isDirected, serviceName, serviceId, consistencyLevel,
hTableName.getOrElse(service.hTableName), hTableTTL.orElse(service.hTableTTL))
val labelMetas =
if (props.isEmpty) List(LabelMeta.timestamp)
else props.toList.map {
case (name, defaultVal, dataType, usedInIndex) =>
LabelMeta.findOrInsert(createdId.toInt, name, defaultVal.toString, dataType, usedInIndex)
}
// Logger.error(s"$labelMetas")
val defaultIndexMetaSeqs = labelMetas.filter(_.usedInIndex).map(_.seq) match {
case metaSeqs => if (metaSeqs.isEmpty) List(LabelMeta.timestamp.seq) else metaSeqs
}
// Logger.error(s"$defaultIndexMetaSeqs")
// kgraph.Logger.debug(s"Label: $defaultIndexMetaSeqs")
/** deprecated */
// 0 is reserved labelOrderSeq for delete, update
// LabelIndex.findOrInsert(createdId.toInt, 0, List(LabelMeta.timeStampSeq), "")
LabelIndex.findOrInsert(createdId.toInt, LabelIndex.defaultSeq, defaultIndexMetaSeqs, "")
/** TODO: */
(hTableName, hTableTTL) match {
case (None, None) => // do nothing
case (None, Some(hbaseTableTTL)) => throw new RuntimeException("if want to specify ttl, give hbaseTableName also")
case (Some(hbaseTableName), None) =>
// create own hbase table with default ttl on service level.
Management.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, service.hTableTTL)
case (Some(hbaseTableName), Some(hbaseTableTTL)) =>
// create own hbase table with own ttl.
Management.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, hTableTTL)
}
}
def findOrInsert(label: String,
srcServiceId: Int,
srcColumnName: String,
srcColumnType: String,
tgtServiceId: Int,
tgtColumnName: String,
tgtColumnType: String,
isDirected: Boolean = true,
serviceName: String,
serviceId: Int,
props: Seq[(String, Any, String, Boolean)] = Seq.empty[(String, Any, String, Boolean)],
consistencyLevel: String,
hTableName: Option[String],
hTableTTL: Option[Int]): Label = {
findByName(label, false) match {
case Some(l) => l
case None =>
insertAll(label, srcServiceId, srcColumnName, srcColumnType, tgtServiceId, tgtColumnName,
tgtColumnType, isDirected, serviceName, serviceId, props, consistencyLevel, hTableName, hTableTTL)
val cacheKey = s"label=$label"
expireCache(cacheKey)
findByName(label).get
}
}
def findAllLabels(): List[Label] = {
val labels = sql"""
select *
from labels
""".map { rs => Label(rs) }.list().apply()
labels.foreach(_.init)
putsToCache(labels.map { label =>
val cacheKey = s"label=${label.label}"
(cacheKey -> label)
})
putsToCache(labels.map { label =>
val cacheKey = s"id=${label.id.get}"
(cacheKey -> label)
})
labels
}
def findAllLabels(serviceName: Option[String] = None, offset: Int = 0, limit: Int = 10): List[Label] = {
val sql = serviceName match {
case None =>
sql"""
select *
from labels
limit ${offset}, ${limit}
"""
case Some(sName) =>
sql"""
select *
from labels
where service_name = ${sName}
limit ${offset}, ${limit}
"""
}
sql.map { rs => Label(rs) }.list().apply()
}
def delete(id: Int) = {
val label = findById(id)
sql"""delete from labels where id = ${label.id.get}""".execute.apply()
val cacheKeys = List(s"id=$id", s"label=${label.label}")
cacheKeys.foreach(expireCache(_))
}
}
case class Label(id: Option[Int], label: String,
srcServiceId: Int, srcColumnName: String, srcColumnType: String,
tgtServiceId: Int, tgtColumnName: String, tgtColumnType: String,
isDirected: Boolean = true, serviceName: String, serviceId: Int,
consistencyLevel: String = "strong", hTableName: String,
hTableTTL: Option[Int]) extends JSONParser {
def metas = LabelMeta.findAllByLabelId(id.get)
def metaSeqsToNames = metas.map(x => (x.seq, x.name)) toMap
// lazy val firstHBaseTableName = hbaseTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)
lazy val srcService = Service.findById(srcServiceId)
lazy val tgtService = Service.findById(tgtServiceId)
lazy val service = Service.findById(serviceId)
/**
* TODO
* change this to apply hbase table from target serviceName
*/
// lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.tableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME))
// lazy val (hbaseZkAddr, hbaseTableName) = (Config.HBASE_ZOOKEEPER_QUORUM, hTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME))
// lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, hTableName.split(",").headOption.getOrElse(GraphConnection.getConfVal("hbase.table.name")))
lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, hTableName.split(",").head)
lazy val (srcColumn, tgtColumn) = (ServiceColumn.find(srcServiceId, srcColumnName).get, ServiceColumn.find(tgtServiceId, tgtColumnName).get)
lazy val direction = if (isDirected) "out" else "undirected"
lazy val defaultIndex = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.defaultSeq)
//TODO: Make sure this is correct
lazy val indices = LabelIndex.findByLabelIdAll(id.get, useCache = true)
lazy val indicesMap = indices.map(idx => (idx.seq, idx)) toMap
lazy val indexSeqsMap = indices.map(idx => (idx.metaSeqs, idx)) toMap
lazy val extraIndices = indices.filter(idx => defaultIndex.isDefined && idx.id.get != defaultIndex.get.id.get)
// indices filterNot (_.id.get == defaultIndex.get.id.get)
lazy val extraIndicesMap = extraIndices.map(idx => (idx.seq, idx)) toMap
lazy val metaProps = LabelMeta.reservedMetas ::: LabelMeta.findAllByLabelId(id.get, useCache = true)
lazy val metaPropsMap = metaProps.map(x => (x.seq, x)).toMap
lazy val metaPropsInvMap = metaProps.map(x => (x.name, x)).toMap
lazy val metaPropNames = metaProps.map(x => x.name)
lazy val metaPropNamesMap = metaProps.map(x => (x.seq, x.name)) toMap
def init() = {
metas
metaSeqsToNames
service
srcColumn
tgtColumn
defaultIndex
indices
metaProps
}
def srcColumnInnerVal(jsValue: JsValue) = {
jsValueToInnerVal(jsValue, srcColumnType)
}
def tgtColumnInnerVal(jsValue: JsValue) = {
jsValueToInnerVal(jsValue, tgtColumnType)
}
override def toString(): String = {
val orderByKeys = LabelMeta.findAllByLabelId(id.get)
super.toString() + orderByKeys.toString()
}
def findLabelIndexSeq(scoring: List[(Byte, Double)]): Byte = {
if (scoring.isEmpty) LabelIndex.defaultSeq
else {
LabelIndex.findByLabeIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
}
}
lazy val toJson = Json.obj("labelName" -> label,
"from" -> srcColumn.toJson, "to" -> tgtColumn.toJson,
// "indexProps" -> indexPropNames,
"defaultIndex" -> defaultIndex.map(x => x.toJson),
"extraIndex" -> extraIndices.map(exIdx => exIdx.toJson),
"metaProps" -> metaProps.map(_.toJson) // , "indices" -> indices.map(idx => idx.toJson)
)
def deleteAll() = {
LabelMeta.findAllByLabelId(id.get, false).foreach { x => LabelMeta.delete(x.id.get) }
// LabelIndexProp.findAllByLabel(id.get, false).foreach { x => LabelIndexProp.delete(x.id.get) }
LabelIndex.findByLabelIdAll(id.get, false).foreach { x => LabelIndex.delete(x.id.get) }
Label.delete(id.get)
}
}