blob: 4209ca5f37091fd6342ce30540a053adea2be321 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.core.schema
import org.apache.s2graph.core.GraphUtil
import org.apache.s2graph.core.schema.LabelIndex.LabelIndexMutateOption
import org.apache.s2graph.core.utils.logger
import play.api.libs.json.{JsObject, JsString, Json}
import scalikejdbc._
object LabelIndex extends SQLSyntaxSupport[LabelIndex] {
import Schema._
val className = LabelIndex.getClass.getSimpleName
val DefaultName = "_PK"
val DefaultMetaSeqs = Seq(LabelMeta.timestampSeq)
val DefaultSeq = 1.toByte
val MaxOrderSeq = 7
def apply(rs: WrappedResultSet): LabelIndex = {
LabelIndex(rs.intOpt("id"), rs.int("label_id"), rs.string("name"), rs.byte("seq"),
rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match {
case metaSeqsList => metaSeqsList
},
rs.string("formulars"),
rs.intOpt("dir"),
rs.stringOpt("options")
)
}
case class LabelIndexMutateOption(dir: Byte,
method: String,
rate: Double,
totalModular: Long,
storeDegree: Boolean) {
val isBufferIncrement = method == "drop" || method == "sample" || method == "hash_sample"
def sample[T](a: T, hashOpt: Option[Long]): Boolean = {
if (method == "drop") false
else if (method == "sample") {
if (scala.util.Random.nextDouble() < rate) true
else false
} else if (method == "hash_sample") {
val hash = hashOpt.getOrElse(throw new RuntimeException("hash_sample need _from_hash value"))
if ((hash.abs % totalModular) / totalModular.toDouble < rate) true
else false
} else true
}
}
def findById(id: Int)(implicit session: DBSession = AutoSession) = {
val cacheKey = className + "id=" + id
withCache(cacheKey) {
sql"""select * from label_indices where id = ${id}""".map { rs => LabelIndex(rs) }.single.apply
}.get
}
def findByLabelIdAll(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
val cacheKey = className + "labelId=" + labelId
if (useCache) {
withCaches(cacheKey)( sql"""
select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC
""".map { rs => LabelIndex(rs) }.list.apply)
} else {
sql"""
select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC
""".map { rs => LabelIndex(rs) }.list.apply
}
}
def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], formulars: String,
direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): Long = {
sql"""
insert into label_indices(label_id, name, seq, meta_seqs, formulars, dir, options)
values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${formulars}, ${direction}, ${options})
"""
.updateAndReturnGeneratedKey.apply()
}
def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], formulars: String,
direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): LabelIndex = {
findByLabelIdAndSeqs(labelId, metaSeqs, direction) match {
case Some(s) => s
case None =>
val orders = findByLabelIdAll(labelId, false)
val seq = (orders.size + 1).toByte
assert(seq <= MaxOrderSeq)
val createdId = insert(labelId, indexName, seq, metaSeqs, formulars, direction, options)
val cacheKeys = List(s"labelId=$labelId:seq=$seq",
s"labelId=$labelId:seqs=$metaSeqs:dir=$direction", s"labelId=$labelId:seq=$seq", s"id=$createdId")
cacheKeys.foreach { key =>
expireCache(className + key)
expireCaches(className + key)
}
findByLabelIdAndSeq(labelId, seq).get
}
}
def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte], direction: Option[Int])(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
val cacheKey = className + "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + ":dir=" + direction
withCache(cacheKey) {
sql"""
select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")} and dir = ${direction}
""".map { rs => LabelIndex(rs) }.single.apply
}
}
def findByLabelIdAndSeq(labelId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
// val cacheKey = s"labelId=$labelId:seq=$seq"
val cacheKey = className + "labelId=" + labelId + ":seq=" + seq
if (useCache) {
withCache(cacheKey)( sql"""
select * from label_indices where label_id = ${labelId} and seq = ${seq}
""".map { rs => LabelIndex(rs) }.single.apply)
} else {
sql"""
select * from label_indices where label_id = ${labelId} and seq = ${seq}
""".map { rs => LabelIndex(rs) }.single.apply
}
}
def delete(id: Int)(implicit session: DBSession = AutoSession) = {
val labelIndex = findById(id)
val seqs = labelIndex.metaSeqs.mkString(",")
val (labelId, seq) = (labelIndex.labelId, labelIndex.seq)
sql"""delete from label_indices where id = ${id}""".execute.apply()
val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs:dir=${labelIndex.dir}")
cacheKeys.foreach { key =>
expireCache(className + key)
expireCaches(className + key)
}
}
def findAll()(implicit session: DBSession = AutoSession) = {
val ls = sql"""select * from label_indices""".map { rs => LabelIndex(rs) }.list.apply
putsToCacheOption(ls.flatMap { x =>
Seq(
s"id=${x.id.get}",
s"labelId=${x.labelId}:seq=${x.seq}",
s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}:dir=${x.dir}"
).map(cacheKey => (className + cacheKey, x))
})
putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
val cacheKey = s"labelId=${labelId}"
(className + cacheKey -> ls)
}.toList)
ls
}
}
case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String,
dir: Option[Int], options: Option[String]) {
// both
lazy val label = Label.findById(labelId)
lazy val metas = label.metaPropsMap
lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => label.metaPropsMap.get(metaSeq))
lazy val sortKeyTypesArray = sortKeyTypes.toArray
lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name }
lazy val toJson = {
val dirJs = dir.map(GraphUtil.fromDirection).getOrElse("both")
val optionsJs = try { options.map(Json.parse).getOrElse(Json.obj()) } catch { case e: Exception => Json.obj() }
Json.obj(
"name" -> name,
"propNames" -> sortKeyTypes.map(x => x.name),
"dir" -> dirJs,
"options" -> optionsJs
)
}
def parseOption(dir: String): Option[LabelIndexMutateOption] = try {
options.map { string =>
val jsObj = Json.parse(string) \ dir
val method = (jsObj \ "method").asOpt[String].getOrElse("default")
val rate = (jsObj \ "rate").asOpt[Double].getOrElse(1.0)
val totalModular = (jsObj \ "totalModular").asOpt[Long].getOrElse(100L)
val storeDegree = (jsObj \ "storeDegree").asOpt[Boolean].getOrElse(true)
LabelIndexMutateOption(GraphUtil.directions(dir).toByte, method, rate, totalModular, storeDegree)
}
} catch {
case e: Exception =>
logger.error(s"Parse failed labelOption: ${this.label}", e)
None
}
lazy val inDirOption = parseOption("in")
lazy val outDirOption = parseOption("out")
}