blob: b3d255d440f98c4f74d0a4771bdf3b8f11c7427a [file] [log] [blame]
package controllers
import com.daumkakao.s2graph.rest.config.{Instrumented, Config}
import com.daumkakao.s2graph.core.{ Edge, Graph, GraphElement, GraphUtil, Vertex, KGraphExceptions }
import play.api.Logger
import play.api.libs.json.JsValue
import play.api.mvc.{ Controller, Result }
import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
object EdgeController extends Controller with Instrumented with RequestParser {
import controllers.ApplicationController._
import play.api.libs.concurrent.Execution.Implicits._
private val maxLength = 1024 * 1024 * 16
private[controllers] def tryMutates(jsValue: JsValue, operation: String): Future[Result] = {
Future {
if (!Config.IS_WRITE_SERVER) Unauthorized
val edges = new ListBuffer[Edge]
try {
edges ++= toEdges(jsValue, operation)
// store valid edges came to system.
for (edge <- edges) {
try {
aggregateElement(edge, None)
} catch {
case e: Throwable => Logger.error(s"tryMutates: $edge, $e", e)
}
}
getOrElseUpdateMetric("IncommingEdges")(metricRegistry.counter("IncommingEdges")).inc(edges.size)
Ok(s"${edges.size} $operation success\n")
} catch {
case e: KGraphExceptions.JsonParseException => BadRequest(s"e")
case e: Throwable =>
play.api.Logger.error(s"mutateAndPublish: $e", e)
InternalServerError(s"${e.getStackTraceString}")
}
}
}
private[controllers] def aggregateElement(element: GraphElement, originalString: Option[String]) = {
//KafkaAggregatorActor.enqueue(Protocol.elementToKafkaMessage(Config.KAFKA_LOG_TOPIC, element, originalString))
element match {
case v: Vertex => Graph.mutateVertex(v)
case e: Edge => Graph.mutateEdge(e)
case _ => Logger.error(s"InvalidType: $element, $originalString")
}
}
private[controllers] def mutateAndPublish(str: String) = {
Future {
if (!Config.IS_WRITE_SERVER) Unauthorized
val edgeStrs = str.split("\\n")
val elements = new ListBuffer[GraphElement]
var vertexCnt = 0L
var edgeCnt = 0L
try {
val parsedElements =
for (edgeStr <- edgeStrs; str <- GraphUtil.parseString(edgeStr); element <- Graph.toGraphElement(str)) yield {
element match {
case v: Vertex => vertexCnt += 1
case e: Edge => edgeCnt += 1
}
try {
aggregateElement(element, Some(str))
} catch {
case e: Throwable => Logger.error(s"mutateAndPublish: $element, $e", e)
}
element
}
elements ++= parsedElements
getOrElseUpdateMetric("IncommingVertices")(metricRegistry.counter("IncommingVertices")).inc(vertexCnt)
getOrElseUpdateMetric("IncommingEdges")(metricRegistry.counter("IncommingEdges")).inc(edgeCnt)
Ok(s" ${parsedElements.size} mutation success.\n")
} catch {
case e: KGraphExceptions.JsonParseException => BadRequest(s"$e")
case e: Throwable =>
play.api.Logger.error(s"mutateAndPublish: $e", e)
InternalServerError(s"${e.getStackTraceString}")
}
}
}
def mutateBulk() = withHeaderAsync(parse.text) { request =>
mutateAndPublish(request.body)
}
def inserts() = withHeaderAsync(parse.json) { request =>
tryMutates(request.body, "insert")
}
def insertsBulk() = withHeaderAsync(parse.json) { request =>
tryMutates(request.body, "insertBulk")
}
def deletes() = withHeaderAsync(parse.json) { request =>
tryMutates(request.body, "delete")
}
def updates() = withHeaderAsync(parse.json) { request =>
tryMutates(request.body, "update")
}
def increments() = withHeaderAsync(parse.json) { request =>
tryMutates(request.body, "increment")
}
}