blob: aed8ced42934b47c0126adb9dc6de343cb584529 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.rest.play.controllers
import com.fasterxml.jackson.databind.JsonMappingException
import org.apache.s2graph.core.ExceptionHandler.KafkaMessage
import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls.Label
import org.apache.s2graph.core.rest.RequestParser
import org.apache.s2graph.core.utils.logger
import org.apache.s2graph.rest.play.actors.QueueActor
import org.apache.s2graph.rest.play.config.Config
import play.api.libs.json._
import play.api.mvc.{Controller, Result}
import scala.collection.Seq
import scala.concurrent.Future
object EdgeController extends Controller {
import ApplicationController._
import play.api.libs.concurrent.Execution.Implicits._
private val s2: S2Graph = org.apache.s2graph.rest.play.Global.s2graph
private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser
private val walLogHandler: ExceptionHandler = org.apache.s2graph.rest.play.Global.wallLogHandler
private def enqueue(topic: String, elem: GraphElement, tsv: String, publishJson: Boolean = false) = {
val kafkaMessage = ExceptionHandler.toKafkaMessage(topic, elem, Option(tsv), publishJson)
walLogHandler.enqueue(kafkaMessage)
}
private def publish(graphElem: GraphElement, tsv: String) = {
val kafkaTopic = toKafkaTopic(graphElem.isAsync)
graphElem match {
case v: S2Vertex =>
enqueue(kafkaTopic, graphElem, tsv)
case e: S2Edge =>
e.innerLabel.extraOptions.get("walLog") match {
case None =>
enqueue(kafkaTopic, e, tsv)
case Some(walLogOpt) =>
(walLogOpt \ "method").get match {
case JsString("drop") => // pass
case JsString("sample") =>
val rate = (walLogOpt \ "rate").as[Double]
if (scala.util.Random.nextDouble() < rate) {
enqueue(kafkaTopic, e, tsv)
}
case _ =>
enqueue(kafkaTopic, e, tsv)
}
}
case _ => logger.error(s"Unknown graph element type: ${graphElem}")
}
}
private def toDeleteAllFailMessages(srcVertices: Seq[S2Vertex], labels: Seq[Label], dir: Int, ts: Long ) = {
for {
vertex <- srcVertices
id = vertex.id.toString
label <- labels
} yield {
val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", GraphUtil.fromOp(dir.toByte)).mkString("\t")
ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, tsv)
}
}
private def publishFailTopic(kafkaMessages: Seq[KafkaMessage]): Unit ={
kafkaMessages.foreach(walLogHandler.enqueue)
}
def mutateElementsWithFailLog(elements: Seq[(GraphElement, String)]) ={
val result = s2.mutateElements(elements.map(_._1), true)
result onComplete { results =>
results.get.zip(elements).map {
case (false, (e: S2Edge, tsv: String)) =>
val kafkaMessages = if(e.op == GraphUtil.operations("deleteAll")){
toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.innerLabel), e.labelWithDir.dir, e.ts)
} else{
Seq(ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, e, Some(tsv)))
}
publishFailTopic(kafkaMessages)
case _ =>
}
}
result
}
private def tryMutate(elementsWithTsv: Seq[(GraphElement, String)], withWait: Boolean): Future[Result] = {
if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
else {
elementsWithTsv.foreach { case (graphElem, tsv) =>
publish(graphElem, tsv)
}
if (elementsWithTsv.isEmpty) Future.successful(jsonResponse(JsArray()))
else {
val elementWithIdxs = elementsWithTsv.zipWithIndex
if (withWait) {
val (elementSync, elementAsync) = elementWithIdxs.partition { case ((element, tsv), idx) =>
!skipElement(element.isAsync)
}
val retToSkip = elementAsync.map(_._2 -> true)
val elementsToStore = elementSync.map(_._1)
val elementsIdxToStore = elementSync.map(_._2)
mutateElementsWithFailLog(elementsToStore).map { rets =>
elementsIdxToStore.zip(rets) ++ retToSkip
}.map { rets =>
Json.toJson(rets.sortBy(_._1).map(_._2))
}.map(jsonResponse(_))
} else {
val rets = elementWithIdxs.map { case ((element, tsv), idx) =>
if (!skipElement(element.isAsync)) QueueActor.router ! (element, tsv)
true
}
Future.successful(jsonResponse(Json.toJson(rets)))
}
}
}
}
def mutateJsonFormat(jsValue: JsValue, operation: String, withWait: Boolean = false): Future[Result] = {
logger.debug(s"$jsValue")
try {
val edgesWithTsv = requestParser.parseJsonFormat(jsValue, operation)
tryMutate(edgesWithTsv, withWait)
} catch {
case e: JsonMappingException =>
logger.malformed(jsValue, e)
Future.successful(BadRequest(s"${e.getMessage}"))
case e: GraphExceptions.JsonParseException =>
logger.malformed(jsValue, e)
Future.successful(BadRequest(s"${e.getMessage}"))
case e: Exception =>
logger.malformed(jsValue, e)
Future.failed(e)
}
}
def mutateBulkFormat(str: String, withWait: Boolean = false): Future[Result] = {
logger.debug(s"$str")
try {
val elementsWithTsv = requestParser.parseBulkFormat(str)
tryMutate(elementsWithTsv, withWait)
} catch {
case e: JsonMappingException =>
logger.malformed(str, e)
Future.successful(BadRequest(s"${e.getMessage}"))
case e: GraphExceptions.JsonParseException =>
logger.malformed(str, e)
Future.successful(BadRequest(s"${e.getMessage}"))
case e: Exception =>
logger.malformed(str, e)
Future.failed(e)
}
}
def mutateBulk() = withHeaderAsync(parse.text) { request =>
mutateBulkFormat(request.body, withWait = false)
}
def mutateBulkWithWait() = withHeaderAsync(parse.text) { request =>
mutateBulkFormat(request.body, withWait = true)
}
def inserts() = withHeaderAsync(jsonParser) { request =>
mutateJsonFormat(request.body, "insert")
}
def insertsWithWait() = withHeaderAsync(jsonParser) { request =>
mutateJsonFormat(request.body, "insert", withWait = true)
}
def insertsBulk() = withHeaderAsync(jsonParser) { request =>
mutateJsonFormat(request.body, "insertBulk")
}
def deletes() = withHeaderAsync(jsonParser) { request =>
mutateJsonFormat(request.body, "delete")
}
def deletesWithWait() = withHeaderAsync(jsonParser) { request =>
mutateJsonFormat(request.body, "delete", withWait = true)
}
def updates() = withHeaderAsync(jsonParser) { request =>
mutateJsonFormat(request.body, "update")
}
def updatesWithWait() = withHeaderAsync(jsonParser) { request =>
mutateJsonFormat(request.body, "update", withWait = true)
}
def increments() = withHeaderAsync(jsonParser) { request =>
mutateJsonFormat(request.body, "increment")
}
def incrementsWithWait() = withHeaderAsync(jsonParser) { request =>
mutateJsonFormat(request.body, "increment", withWait = true)
}
def incrementCounts() = withHeaderAsync(jsonParser) { request =>
val jsValue = request.body
val edgesWithTsv = requestParser.parseJsonFormat(jsValue, "incrementCount")
val edges = for {
(e, _tsv) <- edgesWithTsv if !skipElement(e.isAsync)
} yield e
if (edges.isEmpty) Future.successful(jsonResponse(JsArray()))
else {
s2.incrementCounts(edges, withWait = true).map { results =>
val json = results.map { case (isSuccess, resultCount, count) =>
Json.obj("success" -> isSuccess, "result" -> resultCount, "_count" -> count)
}
jsonResponse(Json.toJson(json))
}
}
}
def deleteAll() = withHeaderAsync(jsonParser) { request =>
deleteAllInner(request.body, withWait = true)
}
def deleteAllWithOutWait() = withHeaderAsync(jsonParser) { request =>
deleteAllInner(request.body, withWait = false)
}
def deleteAllInner(jsValue: JsValue, withWait: Boolean) = {
/** logging for delete all request */
def enqueueLogMessage(ids: Seq[JsValue], labels: Seq[Label], ts: Long, direction: String, topicOpt: Option[String]) = {
val kafkaMessages = for {
id <- ids
label <- labels
} yield {
val tsv = Seq(ts, "deleteAll", "e", RequestParser.jsToStr(id), RequestParser.jsToStr(id), label.label, "{}", direction).mkString("\t")
val topic = topicOpt.getOrElse { toKafkaTopic(label.isAsync) }
ExceptionHandler.toKafkaMessage(topic, tsv)
}
kafkaMessages.foreach(walLogHandler.enqueue)
}
def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue],
ts: Long, vertices: Seq[S2Vertex]) = {
val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts)
if (withWait) {
future onComplete {
case ret =>
if (!ret.get) {
val messages = toDeleteAllFailMessages(vertices.toList, labels, GraphUtil.directions(direction), ts)
publishFailTopic(messages)
}
}
future
} else {
Future.successful(true)
}
}
val deleteFutures = jsValue.as[Seq[JsValue]].map { json =>
val (_labels, direction, ids, ts, vertices) = requestParser.toDeleteParam(json)
val srcVertices = vertices
enqueueLogMessage(ids, _labels, ts, direction, None)
val labels = _labels.filterNot(e => skipElement(e.isAsync))
if (labels.isEmpty || ids.isEmpty) Future.successful(true)
else deleteEach(labels, direction, ids, ts, srcVertices)
}
val deleteResults = Future.sequence(deleteFutures)
deleteResults.map { rst =>
logger.debug(s"deleteAllInner: $rst")
Ok(s"deleted... ${rst.toString()}")
}
}
}