blob: 4bc93767aa9c16bf3ff4a8650b197547e0e80a73 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.core.rest
import java.util.concurrent.{Callable, TimeUnit}
import com.google.common.cache.CacheBuilder
import org.apache.s2graph.core.GraphExceptions.{BadQueryException, ModelNotFoundException}
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.parsers.{Where, WhereParser}
import org.apache.s2graph.core.types._
import play.api.libs.json._
import scala.util.{Random, Failure, Success, Try}
object TemplateHelper {
val findVar = """\"?\$\{(.*?)\}\"?""".r
val num = """(next_minute|next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(minute|hour|day|week)?""".r
val randIntRegex = """randint\((.*,.*)\)""".r
val minute: Long = 60 * 1000L
val hour = 60 * minute
val day = 24 * hour
val week = 7 * day
def calculate(now: Long, n: Int, unit: String): Long = {
val duration = unit match {
case "minute" | "MINUTE" => n * minute
case "hour" | "HOUR" => n * hour
case "day" | "DAY" => n * day
case "week" | "WEEK" => n * week
case _ => n
}
duration + now
}
def randInt(s: String): Long = {
val tokens = s.split(",").map(_.trim)
if (tokens.length != 2) throw new RuntimeException(s"TemplateHelper.randint has wrong format. $s")
val (from, to) = try {
(tokens.head.toInt, tokens.last.toInt)
} catch {
case e: Exception => throw new RuntimeException(s"TemplateHelper.randint has wrong format. $s")
}
if (from > to) throw new RuntimeException(s"TemplateHelper.randint has wrong format. $s")
val diff = to - from
val r = Random.nextInt(diff + 1)
assert(diff >= 0 && diff < Int.MaxValue && from + r < Int.MaxValue)
from + r
}
def replaceVariable(now: Long, body: String): String = {
findVar.replaceAllIn(body, m => {
val matched = m group 1
randIntRegex.findFirstMatchIn(matched) match {
case None =>
num.replaceSomeIn(matched, m => {
val (_pivot, n, unit) = (m.group(1), m.group(2), m.group(3))
val ts = _pivot match {
case null => now
case "now" | "NOW" => now
case "next_minute" | "NEXT_MINUTE" => now / minute * minute + minute
case "next_week" | "NEXT_WEEK" => now / week * week + week
case "next_day" | "NEXT_DAY" => now / day * day + day
case "next_hour" | "NEXT_HOUR" => now / hour * hour + hour
}
if (_pivot == null && n == null && unit == null) None
else if (n == null || unit == null) Option(ts.toString)
else Option(calculate(ts, n.replaceAll(" ", "").toInt, unit).toString)
})
case Some(m) =>
val range = m group 1
randInt(range).toString
}
})
}
}
object RequestParser {
type ExperimentParam = (JsObject, String, String, String, Option[String])
val defaultLimit = 100
def toJsValues(jsValue: JsValue): List[JsValue] = {
jsValue match {
case obj: JsObject => List(obj)
case arr: JsArray => arr.as[List[JsValue]]
case _ => List.empty[JsValue]
}
}
def jsToStr(js: JsValue): String = js match {
case JsString(s) => s
case _ => js.toString()
}
}
class RequestParser(graph: S2Graph) {
import Management.JsonModel._
import RequestParser._
val config = graph.config
val hardLimit = config.getInt("query.hardlimit")
val maxLimit = Int.MaxValue - 1
val DefaultRpcTimeout = config.getInt("hbase.rpc.timeout")
val DefaultMaxAttempt = config.getInt("hbase.client.retries.number")
val DefaultCluster = config.getString("hbase.zookeeper.quorum")
val DefaultCompressionAlgorithm = config.getString("hbase.table.compression.algorithm")
val DefaultPhase = config.getString("phase")
val parserCache = CacheBuilder.newBuilder()
.expireAfterAccess(10000, TimeUnit.MILLISECONDS)
.expireAfterWrite(10000, TimeUnit.MILLISECONDS)
.maximumSize(10000)
.initialCapacity(1000)
.build[String, Try[Where]]
private def extractScoring(label: Label, value: JsValue): Option[Seq[(LabelMeta, Double)]] = {
val ret = for {
js <- parseOption[JsObject](value, "scoring")
} yield {
for {
(k, v) <- js.fields
labelMata <- label.metaPropsInvMap.get(k)
} yield {
val value = v match {
case n: JsNumber => n.as[Double]
case _ => throw new Exception("scoring weight should be double.")
}
(labelMata, value)
}
}
ret
}
def extractInterval(label: Label, jsValue: JsValue): Option[(Seq[(String, JsValue)], Seq[(String, JsValue)])] = {
def extractKv(js: JsValue): Seq[(String, JsValue)] = js match {
case JsObject(obj) => obj.toSeq
case JsArray(arr) => arr.flatMap {
case JsObject(obj) => obj.toSeq
case _ => throw new RuntimeException(s"cannot support json type $js")
}
case _ => throw new RuntimeException(s"cannot support json type: $js")
}
val ret = for {
js <- parseOption[JsObject](jsValue, "interval")
fromJs <- (js \ "from").asOpt[JsValue]
toJs <- (js \ "to").asOpt[JsValue]
} yield {
val from = extractKv(fromJs)
val to = extractKv(toJs)
(from, to)
}
ret
}
def extractDuration(label: Label, jsValue: JsValue) = {
for {
js <- parseOption[JsObject](jsValue, "duration")
} yield {
val minTs = (js \ "from").get match {
case JsString(s) => TemplateHelper.replaceVariable(System.currentTimeMillis(), s).toLong
case JsNumber(n) => n.toLong
case _ => Long.MinValue
}
val maxTs = (js \ "to").get match {
case JsString(s) => TemplateHelper.replaceVariable(System.currentTimeMillis(), s).toLong
case JsNumber(n) => n.toLong
case _ => Long.MaxValue
}
if (minTs > maxTs) {
throw new BadQueryException("Duration error. Timestamp of From cannot be larger than To.")
}
(minTs, maxTs)
}
}
def extractHas(label: Label, jsValue: JsValue) = {
val ret = for {
js <- parseOption[JsObject](jsValue, "has")
} yield {
for {
(k, v) <- js.fields
labelMeta <- LabelMeta.findByName(label.id.get, k)
value <- jsValueToInnerVal(v, labelMeta.dataType, label.schemaVersion)
} yield {
labelMeta.name -> value
}
}
ret.map(_.toMap).getOrElse(Map.empty[String, InnerValLike])
}
def extractWhere(label: Label, whereClauseOpt: Option[String]): Try[Where] = {
whereClauseOpt match {
case None => Success(WhereParser.success)
case Some(where) =>
val whereParserKey = s"${label.label}_${where}"
parserCache.get(whereParserKey, new Callable[Try[Where]] {
override def call(): Try[Where] = {
val _where = TemplateHelper.replaceVariable(System.currentTimeMillis(), where)
WhereParser(label).parse(_where) match {
case s@Success(_) => s
case Failure(ex) => throw BadQueryException(ex.getMessage, ex)
}
}
})
}
}
def extractGroupBy(value: Option[JsValue]): GroupBy = value.map {
case obj: JsObject =>
val keys = (obj \ "keys").asOpt[Seq[String]].getOrElse(Nil)
val groupByLimit = (obj \ "limit").asOpt[Int].getOrElse(hardLimit)
val minShouldMatchOpt = (obj \ "minimumShouldMatch").asOpt[JsObject].map { o =>
val prop = (o \ "prop").asOpt[String].getOrElse("to")
val count = (o \ "count").asOpt[Int].getOrElse(0)
val terms = (o \ "terms").asOpt[Set[JsValue]].getOrElse(Set.empty).map {
case JsString(s) => s
case JsNumber(n) => n
case _ => throw new RuntimeException("not supported data type")
}.map(_.asInstanceOf[Any])
MinShouldMatchParam(prop, count, terms)
}
GroupBy(keys, groupByLimit, minShouldMatch = minShouldMatchOpt)
case arr: JsArray =>
val keys = arr.asOpt[Seq[String]].getOrElse(Nil)
GroupBy(keys)
case _ => GroupBy.Empty
}.getOrElse(GroupBy.Empty)
def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): Seq[S2Vertex] = {
val vertices = for {
label <- Label.findByName(labelName).toSeq
serviceColumn = if (direction == "out") label.srcColumn else label.tgtColumn
id <- ids
innerId <- jsValueToInnerVal(id, serviceColumn.columnType, label.schemaVersion)
} yield {
graph.newVertex(SourceVertexId(serviceColumn, innerId), System.currentTimeMillis())
}
vertices
}
def toMultiQuery(jsValue: JsValue, impIdOpt: Option[String]): MultiQuery = {
val globalQueryOption = toQueryOption(jsValue, impIdOpt)
val queries = for {
queryJson <- (jsValue \ "queries").asOpt[Seq[JsValue]].getOrElse(Seq.empty)
} yield {
val innerQuery = toQuery(queryJson, impIdOpt = impIdOpt)
val queryOption = innerQuery.queryOption
if (queryOption.groupBy.keys.nonEmpty) throw new BadQueryException("Group by option is not allowed in multiple queries.")
if (queryOption.orderByKeys.nonEmpty) throw new BadQueryException("Order by option is not allowed in multiple queries.")
if (globalQueryOption.withScore) innerQuery.copy(queryOption = innerQuery.queryOption.copy(withScore = false))
else innerQuery
// val innerQuery3 =
// if (globalQueryOption.groupBy.keys.nonEmpty) innerQuery2.copy(queryOption = innerQuery2.queryOption.copy(groupBy = GroupBy.Empty))
// else innerQuery2
}
val weights = (jsValue \ "weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0))
MultiQuery(queries = queries, weights = weights, queryOption = globalQueryOption)
}
def toQueryOption(jsValue: JsValue, impIdOpt: Option[String]): QueryOption = {
val filterOutFields = (jsValue \ "filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name))
val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v =>
toQuery(v, impIdOpt = impIdOpt)
}.map { q =>
q.copy(queryOption = q.queryOption.copy(filterOutFields = filterOutFields, selectColumns = filterOutFields))
}
val removeCycle = (jsValue \ "removeCycle").asOpt[Boolean].getOrElse(true)
val selectColumns = (jsValue \ "select").asOpt[List[String]].getOrElse(List.empty)
val groupBy = extractGroupBy((jsValue \ "groupBy").asOpt[JsValue])
val orderByColumns: List[(String, Boolean)] = (jsValue \ "orderBy").asOpt[List[JsObject]].map { jsLs =>
for {
js <- jsLs
(column, orderJs) <- js.fields
} yield {
val ascending = orderJs.as[String].toUpperCase match {
case "ASC" => true
case "DESC" => false
}
column -> ascending
}
}.getOrElse(Nil)
val withScore = (jsValue \ "withScore").asOpt[Boolean].getOrElse(true)
val returnTree = (jsValue \ "returnTree").asOpt[Boolean].getOrElse(false)
//TODO: Refactor this
val limitOpt = (jsValue \ "limit").asOpt[Int]
val returnAgg = (jsValue \ "returnAgg").asOpt[Boolean].getOrElse(true)
val scoreThreshold = (jsValue \ "scoreThreshold").asOpt[Double].getOrElse(Double.MinValue)
val returnDegree = (jsValue \ "returnDegree").asOpt[Boolean].getOrElse(true)
val ignorePrevStepCache = (jsValue \ "ignorePrevStepCache").asOpt[Boolean].getOrElse(false)
val shouldPropagateScore = (jsValue \ "shouldPropagateScore").asOpt[Boolean].getOrElse(true)
QueryOption(removeCycle = removeCycle,
selectColumns = selectColumns,
groupBy = groupBy,
orderByColumns = orderByColumns,
filterOutQuery = filterOutQuery,
filterOutFields = filterOutFields,
withScore = withScore,
returnTree = returnTree,
limitOpt = limitOpt,
returnAgg = returnAgg,
scoreThreshold = scoreThreshold,
returnDegree = returnDegree,
impIdOpt = impIdOpt,
ignorePrevStepCache,
shouldPropagateScore
)
}
def toQuery(jsValue: JsValue, impIdOpt: Option[String]): Query = {
try {
val vertices = for {
value <- (jsValue \ "srcVertices").asOpt[Seq[JsValue]].getOrElse(Nil)
serviceName <- (value \ "serviceName").asOpt[String].toSeq
columnName <- (value \ "columnName").asOpt[String].toSeq
idJson = (value \ "id").asOpt[JsValue].map(Seq(_)).getOrElse(Nil)
idsJson = (value \ "ids").asOpt[Seq[JsValue]].getOrElse(Nil)
id <- (idJson ++ idsJson).flatMap(jsValueToAny(_).toSeq).distinct
} yield graph.toVertex(serviceName, columnName, id)
if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is empty")
val steps = parse[Vector[JsValue]](jsValue, "steps")
val queryOption = toQueryOption(jsValue, impIdOpt)
val querySteps =
steps.zipWithIndex.map { case (step, stepIdx) =>
val labelWeights = step match {
case obj: JsObject =>
val converted = for {
(k, v) <- (obj \ "weights").asOpt[JsObject].getOrElse(Json.obj()).fields
l <- Label.findByName(k)
} yield {
l.id.get -> v.toString().toDouble
}
converted.toMap
case _ => Map.empty[Int, Double]
}
val queryParamJsVals = step match {
case arr: JsArray => arr.as[List[JsValue]]
case obj: JsObject => (obj \ "step").as[List[JsValue]]
case _ => List.empty[JsValue]
}
val nextStepScoreThreshold = step match {
case obj: JsObject => (obj \ "nextStepThreshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold)
case _ => QueryParam.DefaultThreshold
}
val nextStepLimit = step match {
case obj: JsObject => (obj \ "nextStepLimit").asOpt[Int].getOrElse(-1)
case _ => -1
}
val cacheTTL = step match {
case obj: JsObject => (obj \ "cacheTTL").asOpt[Int].getOrElse(-1)
case _ => -1
}
val queryParams =
for {
labelGroup <- queryParamJsVals
queryParam <- parseQueryParam(labelGroup, queryOption)
} yield {
val (_, columnName) =
if (queryParam.dir == GraphUtil.directions("out")) {
(queryParam.label.srcService.serviceName, queryParam.label.srcColumnName)
} else {
(queryParam.label.tgtService.serviceName, queryParam.label.tgtColumnName)
}
//FIXME:
if (stepIdx == 0 && vertices.nonEmpty && !vertices.exists(v => v.columnName == columnName)) {
throw BadQueryException("srcVertices contains incompatiable serviceName or columnName with first step.")
}
queryParam
}
val groupBy = extractGroupBy((step \ "groupBy").asOpt[JsValue])
Step(queryParams = queryParams,
labelWeights = labelWeights,
nextStepScoreThreshold = nextStepScoreThreshold,
nextStepLimit = nextStepLimit,
cacheTTL = cacheTTL,
groupBy = groupBy)
}
Query(vertices, querySteps, queryOption)
} catch {
case e: BadQueryException =>
throw e
case e: ModelNotFoundException =>
throw BadQueryException(e.getMessage, e)
case e: Exception =>
throw BadQueryException(s"$jsValue, $e", e)
}
}
private def parseQueryParam(labelGroup: JsValue, queryOption: QueryOption): Option[QueryParam] = {
for {
labelName <- parseOption[String](labelGroup, "label")
} yield {
val label = Label.findByName(labelName).getOrElse(throw BadQueryException(s"$labelName not found"))
val direction = parseOption[String](labelGroup, "direction").getOrElse("out")
val limit = {
parseOption[Int](labelGroup, "limit") match {
case None => defaultLimit
case Some(l) if l < 0 => hardLimit
case Some(l) if l >= 0 => Math.min(l, hardLimit)
}
}
val offset = parseOption[Int](labelGroup, "offset").getOrElse(0)
val interval = extractInterval(label, labelGroup)
val duration = extractDuration(label, labelGroup)
val scoring = extractScoring(label, labelGroup).getOrElse(Nil).toList
val exclude = parseOption[Boolean](labelGroup, "exclude").getOrElse(false)
val include = parseOption[Boolean](labelGroup, "include").getOrElse(false)
val hasFilter = extractHas(label, labelGroup)
val indexName = (labelGroup \ "index").asOpt[String].getOrElse(LabelIndex.DefaultName)
val whereClauseOpt = (labelGroup \ "where").asOpt[String]
val where = extractWhere(label, whereClauseOpt)
val includeDegree = (labelGroup \ "includeDegree").asOpt[Boolean].getOrElse(true)
val rpcTimeout = (labelGroup \ "rpcTimeout").asOpt[Int].getOrElse(DefaultRpcTimeout)
val maxAttempt = (labelGroup \ "maxAttempt").asOpt[Int].getOrElse(DefaultMaxAttempt)
val tgtVertexInnerIdOpt = (labelGroup \ "_to").asOpt[JsValue].filterNot(_ == JsNull).flatMap(jsValueToAny)
val cacheTTL = (labelGroup \ "cacheTTL").asOpt[Long].getOrElse(-1L)
val timeDecayFactor = (labelGroup \ "timeDecay").asOpt[JsObject].map { jsVal =>
val propName = (jsVal \ "propName").asOpt[String].getOrElse(LabelMeta.timestamp.name)
val propNameSeq = label.metaPropsInvMap.get(propName).getOrElse(LabelMeta.timestamp)
val initial = (jsVal \ "initial").asOpt[Double].getOrElse(1.0)
val decayRate = (jsVal \ "decayRate").asOpt[Double].getOrElse(0.1)
if (decayRate >= 1.0 || decayRate <= 0.0) throw new BadQueryException("decay rate should be 0.0 ~ 1.0")
val timeUnit = (jsVal \ "timeUnit").asOpt[Double].getOrElse(60 * 60 * 24.0)
TimeDecay(initial, decayRate, timeUnit, propNameSeq)
}
val threshold = (labelGroup \ "threshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold)
// TODO: refactor this. dirty
val duplicate = parseOption[String](labelGroup, "duplicate").map(s => DuplicatePolicy(s)).getOrElse(DuplicatePolicy.First)
val outputField = (labelGroup \ "outputField").asOpt[String].map(s => Json.arr(Json.arr(s)))
val transformer = (if (outputField.isDefined) outputField else (labelGroup \ "transform").asOpt[JsValue]) match {
case None => EdgeTransformer(EdgeTransformer.DefaultJson)
case Some(json) => EdgeTransformer(json)
}
val scorePropagateOp = (labelGroup \ "scorePropagateOp").asOpt[String].getOrElse("multiply")
val scorePropagateShrinkage = (labelGroup \ "scorePropagateShrinkage").asOpt[Long].getOrElse(500l)
val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1)
val shouldNormalize = (labelGroup \ "normalize").asOpt[Boolean].getOrElse(false)
val cursorOpt = (labelGroup \ "cursor").asOpt[String]
// FIXME: Order of command matter
QueryParam(labelName = labelName,
direction = direction,
offset = offset,
limit = limit,
sample = sample,
maxAttempt = maxAttempt,
rpcTimeout = rpcTimeout,
cacheTTLInMillis = cacheTTL,
indexName = indexName, where = where, threshold = threshold,
rank = RankParam(scoring), intervalOpt = interval, durationOpt = duration,
exclude = exclude, include = include, has = hasFilter, duplicatePolicy = duplicate,
includeDegree = includeDegree, scorePropagateShrinkage = scorePropagateShrinkage,
scorePropagateOp = scorePropagateOp, shouldNormalize = shouldNormalize,
whereRawOpt = whereClauseOpt, cursorOpt = cursorOpt,
tgtVertexIdOpt = tgtVertexInnerIdOpt,
edgeTransformer = transformer, timeDecay = timeDecayFactor
)
}
}
private def parse[R](js: JsValue, key: String)(implicit read: Reads[R]): R = {
(js \ key).validate[R] match {
case JsError(errors) =>
val msg = (JsError.toJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption)
val e = Json.obj("args" -> key, "error" -> msg)
throw new GraphExceptions.JsonParseException(Json.obj("error" -> key).toString)
case JsSuccess(result, _) => result
}
}
private def parseOption[R](js: JsValue, key: String)(implicit read: Reads[R]): Option[R] = {
(js \ key).validateOpt[R] match {
case JsError(errors) =>
val msg = (JsError.toJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption)
val e = Json.obj("args" -> key, "error" -> msg)
throw new GraphExceptions.JsonParseException(Json.obj("error" -> key).toString)
case JsSuccess(result, _) => result
}
}
def jsToStr(js: JsValue): String = js match {
case JsString(s) => s
case _ => js.toString()
}
def parseBulkFormat(str: String): Seq[(GraphElement, String)] = {
val edgeStrs = str.split("\\n").filterNot(_.isEmpty)
val elementsWithTsv = for {
edgeStr <- edgeStrs
str <- GraphUtil.parseString(edgeStr)
element <- graph.toGraphElement(str)
} yield (element, str)
elementsWithTsv
}
def parseJsonFormat(jsValue: JsValue, operation: String): Seq[(S2Edge, String)] = {
val jsValues = toJsValues(jsValue)
jsValues.flatMap(toEdgeWithTsv(_, operation))
}
private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(S2Edge, String)] = {
val srcIds = (jsValue \ "from").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "froms").asOpt[Seq[JsValue]].getOrElse(Nil)
val tgtIds = (jsValue \ "to").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "tos").asOpt[Seq[JsValue]].getOrElse(Nil)
val label = parse[String](jsValue, "label")
val timestamp = parse[Long](jsValue, "timestamp")
val direction = parseOption[String](jsValue, "direction").getOrElse("out")
val propsJson = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
for {
srcId <- srcIds.flatMap(jsValueToAny(_).toSeq)
tgtId <- tgtIds.flatMap(jsValueToAny(_).toSeq)
} yield {
// val edge = Management.toEdge(graph, timestamp, operation, srcId, tgtId, label, direction, fromJsonToProperties(propsJson))
val edge = graph.toEdge(srcId, tgtId, label, direction, fromJsonToProperties(propsJson), ts = timestamp, operation = operation)
val tsv = (jsValue \ "direction").asOpt[String] match {
case None => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString).mkString("\t")
case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString, dir).mkString("\t")
}
(edge, tsv)
}
}
def toVertices(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None) = {
toJsValues(jsValue).map(toVertex(_, operation, serviceName, columnName))
}
def toVertex(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None): S2Vertex = {
val id = parse[JsValue](jsValue, "id")
val ts = parseOption[Long](jsValue, "timestamp").getOrElse(System.currentTimeMillis())
val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") else serviceName.get
val cName = if (columnName.isEmpty) parse[String](jsValue, "columnName") else columnName.get
val props = fromJsonToProperties((jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()))
graph.toVertex(sName, cName, id.toString, props, ts, operation)
}
def toPropElements(jsObj: JsValue) = Try {
val propName = (jsObj \ "name").as[String]
val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String])
val defaultValue = (jsObj \ "defaultValue").as[JsValue] match {
case JsString(s) => s
case _@js => js.toString
}
Prop(propName, defaultValue, dataType)
}
def toPropsElements(jsValue: JsLookupResult): Seq[Prop] = for {
jsObj <- jsValue.asOpt[Seq[JsValue]].getOrElse(Nil)
} yield {
val propName = (jsObj \ "name").as[String]
val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String])
val defaultValue = (jsObj \ "defaultValue").as[JsValue] match {
case JsString(s) => s
case _@js => js.toString
}
Prop(propName, defaultValue, dataType)
}
def toIndicesElements(jsValue: JsLookupResult): Seq[Index] = {
val indices = for {
jsObj <- jsValue.as[Seq[JsValue]]
indexName = (jsObj \ "name").as[String]
propNames = (jsObj \ "propNames").as[Seq[String]]
direction = (jsObj \ "direction").asOpt[String].map(GraphUtil.toDirection)
options = (jsObj \ "options").asOpt[JsValue].map(_.toString)
} yield {
Index(indexName, propNames, direction, options)
}
val (pk, others) = indices.partition(index => index.name == LabelIndex.DefaultName)
val (both, inOut) = others.partition(index => index.direction.isEmpty)
val (in, out) = inOut.partition(index => index.direction.get == GraphUtil.directions("in"))
pk ++ both ++ in ++ out
}
def toLabelElements(jsValue: JsValue): Try[Label] = {
val labelName = parse[String](jsValue, "label")
val srcServiceName = parse[String](jsValue, "srcServiceName")
val tgtServiceName = parse[String](jsValue, "tgtServiceName")
val srcColumnName = parse[String](jsValue, "srcColumnName")
val tgtColumnName = parse[String](jsValue, "tgtColumnName")
val srcColumnType = parse[String](jsValue, "srcColumnType")
val tgtColumnType = parse[String](jsValue, "tgtColumnType")
val serviceName = (jsValue \ "serviceName").asOpt[String].getOrElse(tgtServiceName)
val isDirected = (jsValue \ "isDirected").asOpt[Boolean].getOrElse(true)
val allProps = toPropsElements(jsValue \ "props")
val indices = toIndicesElements(jsValue \ "indices")
val consistencyLevel = (jsValue \ "consistencyLevel").asOpt[String].getOrElse("weak")
// expect new label don`t provide hTableName
val hTableName = (jsValue \ "hTableName").asOpt[String]
val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int]
val schemaVersion = (jsValue \ "schemaVersion").asOpt[String].getOrElse(HBaseType.DEFAULT_VERSION)
val isAsync = (jsValue \ "isAsync").asOpt[Boolean].getOrElse(false)
val compressionAlgorithm = (jsValue \ "compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm)
val options = (jsValue \ "options").asOpt[JsValue].map(_.toString())
graph.management.createLabel(labelName, srcServiceName, srcColumnName, srcColumnType,
tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName,
indices, allProps, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
}
def toIndexElements(jsValue: JsValue) = Try {
val labelName = parse[String](jsValue, "label")
val indices = toIndicesElements(jsValue \ "indices")
(labelName, indices)
}
def toServiceElements(jsValue: JsValue) = {
val serviceName = parse[String](jsValue, "serviceName")
val cluster = (jsValue \ "cluster").asOpt[String].getOrElse(DefaultCluster)
val hTableName = (jsValue \ "hTableName").asOpt[String].getOrElse(s"${serviceName}-${DefaultPhase}")
val preSplitSize = (jsValue \ "preSplitSize").asOpt[Int].getOrElse(1)
val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int]
val compressionAlgorithm = (jsValue \ "compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm)
(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm)
}
def toServiceColumnElements(jsValue: JsValue) = Try {
val serviceName = parse[String](jsValue, "serviceName")
val columnName = parse[String](jsValue, "columnName")
val columnType = parse[String](jsValue, "columnType")
val props = toPropsElements(jsValue \ "props")
(serviceName, columnName, columnType, props)
}
def toCheckEdgeParam(jsValue: JsValue) = {
for {
json <- jsValue.asOpt[Seq[JsValue]].getOrElse(Nil)
from <- (json \ "from").asOpt[JsValue].flatMap(jsValueToAny(_))
to <- (json \ "to").asOpt[JsValue].flatMap(jsValueToAny(_))
labelName <- (json \ "label").asOpt[String]
direction = (json \ "direction").asOpt[String].getOrElse("out")
} yield {
graph.toEdge(from, to, labelName, direction, Map.empty)
}
}
def toGraphElements(str: String): Seq[GraphElement] = {
val edgeStrs = str.split("\\n")
for {
edgeStr <- edgeStrs
str <- GraphUtil.parseString(edgeStr)
element <- graph.toGraphElement(str)
} yield element
}
def toDeleteParam(json: JsValue) = {
val labelName = (json \ "label").as[String]
val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil)
val direction = (json \ "direction").asOpt[String].getOrElse("out")
val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil)
val ts = (json \ "timestamp").asOpt[Long].getOrElse(System.currentTimeMillis())
val vertices = toVertices(labelName, direction, ids)
(labels, direction, ids, ts, vertices)
}
def toFetchAndDeleteParam(json: JsValue) = {
val labelName = (json \ "label").as[String]
val fromOpt = (json \ "from").asOpt[JsValue]
val toOpt = (json \ "to").asOpt[JsValue]
val direction = (json \ "direction").asOpt[String].getOrElse("out")
val indexOpt = (json \ "index").asOpt[String]
val propsOpt = (json \ "props").asOpt[JsObject]
(labelName, fromOpt, toOpt, direction, indexOpt, propsOpt)
}
def parseExperiment(jsQuery: JsValue): Seq[ExperimentParam] = jsQuery.as[Seq[JsObject]].map { obj =>
def _require(field: String) = throw new RuntimeException(s"${field} not found")
val accessToken = (obj \ "accessToken").asOpt[String].getOrElse(_require("accessToken"))
val experimentName = (obj \ "experiment").asOpt[String].getOrElse(_require("experiment"))
val uuid = (obj \ "#uuid").get match {
case JsString(s) => s
case JsNumber(n) => n.toString
case _ => _require("#uuid")
}
val body = (obj \ "params").asOpt[JsObject].getOrElse(Json.obj())
val impKeyOpt = (obj \ Experiment.ImpressionKey).asOpt[String]
(body, accessToken, experimentName, uuid, impKeyOpt)
}
}