blob: 91a7eda691a87f4b75c6520264f67594fd39bad7 [file] [log] [blame]
package controllers
import com.codahale.metrics.Meter
import com.daumkakao.s2graph.core.HBaseElement._
import com.daumkakao.s2graph.core._
import com.daumkakao.s2graph.rest.config.{Instrumented, Config}
import play.api.Logger
//import models.response.param.{ VertexQueryResponse, EdgeQueryResponse }
import play.api.libs.json.{ JsValue, Json }
import play.api.mvc.{ Action, Controller, Result }
import util.TestDataLoader
import scala.concurrent._
object QueryController extends Controller with RequestParser with Instrumented {
// import play.api.libs.concurrent.Execution.Implicits._
// implicit val ex = ApplicationController.globalExecutionContext
import ApplicationController._
// implicit val ex = ApplicationController.globalExecutionContext
import play.api.libs.concurrent.Execution.Implicits.defaultContext
// implicit val context = Akka.system.dispatchers.lookup("akka.actor.multiget-context")
/**
* only for test
*/
private val queryUseMultithread = true
private val buildResultJson = true
private val maxLength = 64 * 1024 + 255 + 2 + 1
private val emptyResult = Seq(Seq.empty[(Edge, Double)])
/**
* end of only for test
*/
val applicationJsonHeader = "application/json"
val orderByKeys = Seq("weight")
// select
def getEdges() = withHeaderAsync(parse.json) { request =>
getEdgesInner(request.body)
}
def getEdgesExcluded() = withHeaderAsync(parse.json) { request =>
getEdgesExcludedInner(request.body)
}
private def getEdgesAsync(jsonQuery: JsValue)(post: Seq[Iterable[(Edge, Double)]] => JsValue): Future[Result] = {
try {
val queryTemplateId = (jsonQuery \ "steps").toString()
getOrElseUpdateMetric(queryTemplateId)(metricRegistry.meter(queryTemplateId)).mark()
if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
Logger.info(s"$jsonQuery")
val q = toQuery(jsonQuery)
val future = Graph.getEdgesAsync(q)
future map { edges =>
val json = post(edges)
Ok(s"${json}\n").as(applicationJsonHeader)
}
} catch {
case e: KGraphExceptions.BadQueryException =>
Logger.error(s"$e", e)
Future { BadRequest.as(applicationJsonHeader) }
case e: Throwable => Future {
Logger.error(s"$e", e)
// watch tower
Ok(s"${post(emptyResult)}\n").as(applicationJsonHeader)
}
}
}
private def getEdgesExcludedAsync(jsonQuery: JsValue)(post: (Seq[Iterable[(Edge, Double)]],
Seq[Iterable[(Edge, Double)]]) => JsValue): Future[Result] = {
try {
val queryTemplateId = (jsonQuery \ "steps").toString()
getOrElseUpdateMetric[Meter](queryTemplateId)(metricRegistry.meter(queryTemplateId)).mark()
if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
Logger.info(s"$jsonQuery")
val q = toQuery(jsonQuery)
val mineQ = Query(q.vertices, List(q.steps.last))
for (mine <- Graph.getEdgesAsync(mineQ); others <- Graph.getEdgesAsync(q)) yield {
val json = post(mine, others)
Ok(s"$json\n").as(applicationJsonHeader)
}
} catch {
case e: KGraphExceptions.BadQueryException =>
Logger.error(s"$e", e)
Future { BadRequest.as(applicationJsonHeader) }
case e: Throwable => Future {
Logger.error(s"$e", e)
// watch tower
Ok(s"${post(emptyResult, emptyResult)}\n").as(applicationJsonHeader)
}
}
}
private def getEdgesInner(jsValue: JsValue) = {
getEdgesAsync(jsValue)(PostProcess.toSimpleVertexArrJson)
}
private def getEdgesExcludedInner(jsValue: JsValue) = {
getEdgesExcludedAsync(jsValue)(PostProcess.toSiimpleVertexArrJson)
}
def getEdgesWithGrouping() = withHeaderAsync(parse.json) { request =>
getEdgesAsync(request.body)(PostProcess.summarizeWithListFormatted)
}
def getEdgesExcludedWithGrouping() = withHeaderAsync(parse.json) { request =>
getEdgesExcludedAsync(request.body)(PostProcess.summarizeWithListExcludeFormatted)
}
@deprecated
def getEdgesGrouped() = withHeaderAsync(parse.json) { request =>
getEdgesAsync(request.body)(PostProcess.summarizeWithList)
}
@deprecated
def getEdgesGroupedExcluded() = withHeaderAsync(parse.json) { request =>
try {
if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
Logger.info(request.body.toString)
val q = toQuery(request.body)
val mineQ = Query(q.vertices, List(q.steps.last))
for (mine <- Graph.getEdgesAsync(mineQ); others <- Graph.getEdgesAsync(q)) yield {
val json = PostProcess.summarizeWithListExclude(mine, others)
Ok(s"$json\n").as(applicationJsonHeader)
}
} catch {
case e: KGraphExceptions.BadQueryException => Future { BadRequest(request.body).as(applicationJsonHeader) }
case e: Throwable => Future {
// watch tower
Ok(s"${PostProcess.summarizeWithListExclude(emptyResult, emptyResult)}\n").as(applicationJsonHeader)
}
}
}
@deprecated
def getEdgesGroupedExcludedFormatted() = withHeaderAsync(parse.json) { request =>
try {
if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
Logger.info(request.body.toString)
val q = toQuery(request.body)
val mineQ = Query(q.vertices, List(q.steps.last))
for (mine <- Graph.getEdgesAsync(mineQ); others <- Graph.getEdgesAsync(q)) yield {
val json = PostProcess.summarizeWithListExcludeFormatted(mine, others)
Ok(s"$json\n").as(applicationJsonHeader)
}
} catch {
case e: KGraphExceptions.BadQueryException => Future { BadRequest(request.body).as(applicationJsonHeader) }
case e: Throwable => Future {
// watch tower
Ok(s"${PostProcess.summarizeWithListExcludeFormatted(emptyResult, emptyResult)}\n").as(applicationJsonHeader)
}
}
}
@deprecated
def getEdge(srcId: String, tgtId: String, labelName: String, direction: String) = Action.async {
if (!Config.IS_QUERY_SERVER) Future { Unauthorized }
try {
val label = Management.tryOption((labelName, true), Label.findByName)
val dir = Management.tryOption(direction, GraphUtil.toDir)
val srcVertexId = toInnerVal(srcId, label.srcColumnType)
val tgtVertexId = toInnerVal(tgtId, label.tgtColumnType)
val src = Vertex(CompositeId(label.srcColumn.id.get, srcVertexId, true, true), System.currentTimeMillis())
val tgt = Vertex(CompositeId(label.tgtColumn.id.get, tgtVertexId, true, false), System.currentTimeMillis())
Graph.getEdge(src, tgt, label, dir).map { edges =>
val ret = edges.headOption.map { edge =>
val json = PostProcess.edgeToJson(edge, 1.0)
Ok(s"$json\n").as(applicationJsonHeader)
}
ret.getOrElse(NotFound(s"NotFound\n").as(applicationJsonHeader))
}
} catch {
case e: Throwable => Future { BadRequest(e.toString()).as(applicationJsonHeader) }
}
}
/**
* Vertex
*/
def getVertices() = withHeaderAsync(parse.json) { request =>
if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
val ts = System.currentTimeMillis()
val props = "{}"
try {
val vertices = request.body.as[List[JsValue]].flatMap { js =>
val serviceName = (js \ "serviceName").as[String]
val columnName = (js \ "columnName").as[String]
for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield {
Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props)
}
}
Graph.getVerticesAsync(vertices) map { vertices =>
val json = PostProcess.verticesToJson(vertices)
Ok(s"$json\n").as(applicationJsonHeader)
}
} catch {
case e @ (_: play.api.libs.json.JsResultException | _: RuntimeException) =>
Future { BadRequest(e.getMessage()).as(applicationJsonHeader) }
case e: Throwable => Future {
// watch tower
Ok(s"${PostProcess.verticesToJson(Seq.empty[Vertex])}").as(applicationJsonHeader)
}
}
}
/**
* Only for test
*/
def testGetEdges(label: String, limit: Int, friendCntStep: Int) = withHeaderAsync { request =>
val rId = if (friendCntStep < 0) Some(TestDataLoader.randomId) else TestDataLoader.randomId(friendCntStep)
if (rId.isEmpty) Future { NotFound.as(applicationJsonHeader) }
else {
val id = rId.get
val l = Label.findByName(label).get
val srcColumnName = l.srcColumn.columnName
val srcServiceName = Service.findById(l.srcServiceId).serviceName
val queryJson = s"""
{
"srcVertices": [{"serviceName": "$srcServiceName", "columnName": "$srcColumnName", "id":$id}],
"steps": [
[{"label": "$label", "direction": "out", "limit": $limit}]
]
}
"""
val json = Json.parse(queryJson)
getEdgesAsync(json)(PostProcess.simple)
}
}
def testGetEdges2(label1: String, limit1: Int, label2: String, limit2: Int) = withHeaderAsync { request =>
val id = TestDataLoader.randomId.toString
val l = Label.findByName(label1).get
val srcColumnName = l.srcColumn.columnName
val srcServiceName = Service.findById(l.srcServiceId).serviceName
val queryJson = s"""
{
"srcVertices": [{"serviceName": "$srcServiceName", "columnName": "$srcColumnName", "id":$id}],
"steps": [
[{"label": "$label1", "direction": "out", "limit": $limit1}],
[{"label": "$label2", "direction": "out", "limit": $limit2}]
]
}
"""
val json = Json.parse(queryJson)
getEdgesAsync(json)(PostProcess.simple)
}
def testGetEdges3(label1: String, limit1: Int, label2: String, limit2: Int, label3: String, limit3: Int) = withHeaderAsync { request =>
val id = TestDataLoader.randomId.toString
val l = Label.findByName(label1).get
val srcColumnName = l.srcColumn.columnName
val srcServiceName = Service.findById(l.srcServiceId).serviceName
val queryJson = s"""
{
"srcVertices": [{"serviceName": "$srcServiceName", "columnName": "$srcColumnName", "id":$id}],
"steps": [
[{"label": "$label1", "direction": "out", "limit": $limit1}],
[{"label": "$label2", "direction": "out", "limit": $limit2}],
[{"label": "$label3", "direction": "out", "limit": $limit3}]
]
}
"""
val json = Json.parse(queryJson)
getEdgesAsync(json)(PostProcess.simple)
}
def ping() = withHeaderAsync { requst =>
Future { Ok("Pong\n") }
}
}