| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.s2graph.core.schema |
| |
| import org.apache.s2graph.core.GraphUtil |
| import org.apache.s2graph.core.schema.LabelIndex.LabelIndexMutateOption |
| import org.apache.s2graph.core.utils.logger |
| import play.api.libs.json.{JsObject, JsString, Json} |
| import scalikejdbc._ |
| |
| object LabelIndex extends SQLSyntaxSupport[LabelIndex] { |
| import Schema._ |
| val className = LabelIndex.getClass.getSimpleName |
| |
| val DefaultName = "_PK" |
| val DefaultMetaSeqs = Seq(LabelMeta.timestampSeq) |
| val DefaultSeq = 1.toByte |
| val MaxOrderSeq = 7 |
| |
| def apply(rs: WrappedResultSet): LabelIndex = { |
| LabelIndex(rs.intOpt("id"), rs.int("label_id"), rs.string("name"), rs.byte("seq"), |
| rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match { |
| case metaSeqsList => metaSeqsList |
| }, |
| rs.string("formulars"), |
| rs.intOpt("dir"), |
| rs.stringOpt("options") |
| ) |
| } |
| |
| case class LabelIndexMutateOption(dir: Byte, |
| method: String, |
| rate: Double, |
| totalModular: Long, |
| storeDegree: Boolean) { |
| |
| val isBufferIncrement = method == "drop" || method == "sample" || method == "hash_sample" |
| |
| def sample[T](a: T, hashOpt: Option[Long]): Boolean = { |
| if (method == "drop") false |
| else if (method == "sample") { |
| if (scala.util.Random.nextDouble() < rate) true |
| else false |
| } else if (method == "hash_sample") { |
| val hash = hashOpt.getOrElse(throw new RuntimeException("hash_sample need _from_hash value")) |
| if ((hash.abs % totalModular) / totalModular.toDouble < rate) true |
| else false |
| } else true |
| } |
| } |
| |
| def findById(id: Int)(implicit session: DBSession = AutoSession) = { |
| val cacheKey = className + "id=" + id |
| withCache(cacheKey) { |
| sql"""select * from label_indices where id = ${id}""".map { rs => LabelIndex(rs) }.single.apply |
| }.get |
| } |
| |
| def findByLabelIdAll(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = { |
| val cacheKey = className + "labelId=" + labelId |
| if (useCache) { |
| withCaches(cacheKey)( sql""" |
| select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC |
| """.map { rs => LabelIndex(rs) }.list.apply) |
| } else { |
| sql""" |
| select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC |
| """.map { rs => LabelIndex(rs) }.list.apply |
| } |
| } |
| |
| def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], formulars: String, |
| direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): Long = { |
| sql""" |
| insert into label_indices(label_id, name, seq, meta_seqs, formulars, dir, options) |
| values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${formulars}, ${direction}, ${options}) |
| """ |
| .updateAndReturnGeneratedKey.apply() |
| } |
| |
| def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], formulars: String, |
| direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): LabelIndex = { |
| findByLabelIdAndSeqs(labelId, metaSeqs, direction) match { |
| case Some(s) => s |
| case None => |
| val orders = findByLabelIdAll(labelId, false) |
| val seq = (orders.size + 1).toByte |
| assert(seq <= MaxOrderSeq) |
| val createdId = insert(labelId, indexName, seq, metaSeqs, formulars, direction, options) |
| val cacheKeys = List(s"labelId=$labelId:seq=$seq", |
| s"labelId=$labelId:seqs=$metaSeqs:dir=$direction", s"labelId=$labelId:seq=$seq", s"id=$createdId") |
| |
| cacheKeys.foreach { key => |
| expireCache(className + key) |
| expireCaches(className + key) |
| } |
| |
| findByLabelIdAndSeq(labelId, seq).get |
| } |
| } |
| |
| def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte], direction: Option[Int])(implicit session: DBSession = AutoSession): Option[LabelIndex] = { |
| val cacheKey = className + "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + ":dir=" + direction |
| withCache(cacheKey) { |
| sql""" |
| select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")} and dir = ${direction} |
| """.map { rs => LabelIndex(rs) }.single.apply |
| } |
| } |
| |
| def findByLabelIdAndSeq(labelId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelIndex] = { |
| // val cacheKey = s"labelId=$labelId:seq=$seq" |
| val cacheKey = className + "labelId=" + labelId + ":seq=" + seq |
| if (useCache) { |
| withCache(cacheKey)( sql""" |
| select * from label_indices where label_id = ${labelId} and seq = ${seq} |
| """.map { rs => LabelIndex(rs) }.single.apply) |
| } else { |
| sql""" |
| select * from label_indices where label_id = ${labelId} and seq = ${seq} |
| """.map { rs => LabelIndex(rs) }.single.apply |
| } |
| } |
| |
| def delete(id: Int)(implicit session: DBSession = AutoSession) = { |
| val labelIndex = findById(id) |
| val seqs = labelIndex.metaSeqs.mkString(",") |
| val (labelId, seq) = (labelIndex.labelId, labelIndex.seq) |
| sql"""delete from label_indices where id = ${id}""".execute.apply() |
| |
| val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs:dir=${labelIndex.dir}") |
| cacheKeys.foreach { key => |
| expireCache(className + key) |
| expireCaches(className + key) |
| } |
| } |
| |
| def findAll()(implicit session: DBSession = AutoSession) = { |
| val ls = sql"""select * from label_indices""".map { rs => LabelIndex(rs) }.list.apply |
| putsToCacheOption(ls.flatMap { x => |
| Seq( |
| s"id=${x.id.get}", |
| s"labelId=${x.labelId}:seq=${x.seq}", |
| s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}:dir=${x.dir}" |
| ).map(cacheKey => (className + cacheKey, x)) |
| }) |
| |
| putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) => |
| val cacheKey = s"labelId=${labelId}" |
| (className + cacheKey -> ls) |
| }.toList) |
| |
| ls |
| } |
| } |
| |
| |
| case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String, |
| dir: Option[Int], options: Option[String]) { |
| // both |
| lazy val label = Label.findById(labelId) |
| lazy val metas = label.metaPropsMap |
| lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => label.metaPropsMap.get(metaSeq)) |
| lazy val sortKeyTypesArray = sortKeyTypes.toArray |
| lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name } |
| |
| lazy val toJson = { |
| val dirJs = dir.map(GraphUtil.fromDirection).getOrElse("both") |
| val optionsJs = try { options.map(Json.parse).getOrElse(Json.obj()) } catch { case e: Exception => Json.obj() } |
| |
| Json.obj( |
| "name" -> name, |
| "propNames" -> sortKeyTypes.map(x => x.name), |
| "dir" -> dirJs, |
| "options" -> optionsJs |
| ) |
| } |
| |
| def parseOption(dir: String): Option[LabelIndexMutateOption] = try { |
| options.map { string => |
| val jsObj = Json.parse(string) \ dir |
| |
| val method = (jsObj \ "method").asOpt[String].getOrElse("default") |
| val rate = (jsObj \ "rate").asOpt[Double].getOrElse(1.0) |
| val totalModular = (jsObj \ "totalModular").asOpt[Long].getOrElse(100L) |
| val storeDegree = (jsObj \ "storeDegree").asOpt[Boolean].getOrElse(true) |
| |
| LabelIndexMutateOption(GraphUtil.directions(dir).toByte, method, rate, totalModular, storeDegree) |
| } |
| } catch { |
| case e: Exception => |
| logger.error(s"Parse failed labelOption: ${this.label}", e) |
| None |
| } |
| |
| lazy val inDirOption = parseOption("in") |
| |
| lazy val outDirOption = parseOption("out") |
| } |