package org.apache.s2graph.core.schema
import org.apache.s2graph.core.GraphUtil
import org.apache.s2graph.core.schema.LabelIndex.LabelIndexMutateOption
import org.apache.s2graph.core.utils.logger
import play.api.libs.json.{JsObject, JsString, Json}
import scalikejdbc._
object LabelIndex extends SQLSyntaxSupport[LabelIndex] {
import Schema._
val className = LabelIndex.getClass.getSimpleName
val DefaultName = "_PK"
val DefaultMetaSeqs = Seq(LabelMeta.timestampSeq)
val DefaultSeq = 1.toByte
val MaxOrderSeq = 7
def apply(rs: WrappedResultSet): LabelIndex = {
LabelIndex(rs.intOpt("id"),"label_id"), rs.string("name"), rs.byte("seq"),
rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match {
case metaSeqsList => metaSeqsList
case class LabelIndexMutateOption(dir: Byte,
method: String,
rate: Double,
totalModular: Long,
storeDegree: Boolean) {
val isBufferIncrement = method == "drop" || method == "sample" || method == "hash_sample"
def sample[T](a: T, hashOpt: Option[Long]): Boolean = {
if (method == "drop") false
else if (method == "sample") {
if (scala.util.Random.nextDouble() < rate) true
else false
} else if (method == "hash_sample") {
val hash = hashOpt.getOrElse(throw new RuntimeException("hash_sample need _from_hash value"))
if ((hash.abs % totalModular) / totalModular.toDouble < rate) true
else false
} else true
def findById(id: Int)(implicit session: DBSession = AutoSession) = {
val cacheKey = className + "id=" + id
withCache(cacheKey) {
sql"""select * from label_indices where id = ${id}""".map { rs => LabelIndex(rs) }.single.apply
def findByLabelIdAll(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
val cacheKey = className + "labelId=" + labelId
if (useCache) {
withCaches(cacheKey)( sql"""
select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC
""".map { rs => LabelIndex(rs) }.list.apply)
} else {
select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC
""".map { rs => LabelIndex(rs) }.list.apply
def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], formulars: String,
direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): Long = {
insert into label_indices(label_id, name, seq, meta_seqs, formulars, dir, options)
values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${formulars}, ${direction}, ${options})
def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], formulars: String,
direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): LabelIndex = {
findByLabelIdAndSeqs(labelId, metaSeqs, direction) match {
case Some(s) => s
case None =>
val orders = findByLabelIdAll(labelId, false)
val seq = (orders.size + 1).toByte
assert(seq <= MaxOrderSeq)
val createdId = insert(labelId, indexName, seq, metaSeqs, formulars, direction, options)
val cacheKeys = List(s"labelId=$labelId:seq=$seq",
s"labelId=$labelId:seqs=$metaSeqs:dir=$direction", s"labelId=$labelId:seq=$seq", s"id=$createdId")
cacheKeys.foreach { key =>
expireCache(className + key)
expireCaches(className + key)
findByLabelIdAndSeq(labelId, seq).get
def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte], direction: Option[Int])(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
val cacheKey = className + "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + ":dir=" + direction
withCache(cacheKey) {
select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")} and dir = ${direction}
""".map { rs => LabelIndex(rs) }.single.apply
def findByLabelIdAndSeq(labelId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
// val cacheKey = s"labelId=$labelId:seq=$seq"
val cacheKey = className + "labelId=" + labelId + ":seq=" + seq
if (useCache) {
withCache(cacheKey)( sql"""
select * from label_indices where label_id = ${labelId} and seq = ${seq}
""".map { rs => LabelIndex(rs) }.single.apply)
} else {
select * from label_indices where label_id = ${labelId} and seq = ${seq}
""".map { rs => LabelIndex(rs) }.single.apply
def delete(id: Int)(implicit session: DBSession = AutoSession) = {
val labelIndex = findById(id)
val seqs = labelIndex.metaSeqs.mkString(",")
val (labelId, seq) = (labelIndex.labelId, labelIndex.seq)
sql"""delete from label_indices where id = ${id}""".execute.apply()
val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs:dir=${labelIndex.dir}")
cacheKeys.foreach { key =>
expireCache(className + key)
expireCaches(className + key)
def findAll()(implicit session: DBSession = AutoSession) = {
val ls = sql"""select * from label_indices""".map { rs => LabelIndex(rs) }.list.apply
putsToCacheOption(ls.flatMap { x =>
).map(cacheKey => (className + cacheKey, x))
putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
val cacheKey = s"labelId=${labelId}"
(className + cacheKey -> ls)
case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String,
dir: Option[Int], options: Option[String]) {
// both
lazy val label = Label.findById(labelId)
lazy val metas = label.metaPropsMap
lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => label.metaPropsMap.get(metaSeq))
lazy val sortKeyTypesArray = sortKeyTypes.toArray
lazy val propNames = { labelMeta => }
lazy val toJson = {
val dirJs ="both")
val optionsJs = try { } catch { case e: Exception => Json.obj() }
"name" -> name,
"propNames" -> =>,
"dir" -> dirJs,
"options" -> optionsJs
def parseOption(dir: String): Option[LabelIndexMutateOption] = try { { string =>
val jsObj = Json.parse(string) \ dir
val method = (jsObj \ "method").asOpt[String].getOrElse("default")
val rate = (jsObj \ "rate").asOpt[Double].getOrElse(1.0)
val totalModular = (jsObj \ "totalModular").asOpt[Long].getOrElse(100L)
val storeDegree = (jsObj \ "storeDegree").asOpt[Boolean].getOrElse(true)
LabelIndexMutateOption(GraphUtil.directions(dir).toByte, method, rate, totalModular, storeDegree)
} catch {
case e: Exception =>
logger.error(s"Parse failed labelOption: ${this.label}", e)
lazy val inDirOption = parseOption("in")
lazy val outDirOption = parseOption("out")