blob: 3686f790cee47bca57188114912aa858014db2af [file] [log] [blame]
package controllers
import play.api.mvc._
import scala.concurrent.Future
import com.daumkakao.s2graph.rest.actors.{ KafkaAggregatorActor, Protocol }
import com.daumkakao.s2graph.rest.config.Config
import Protocol._
import org.apache.kafka.clients.producer.ProducerRecord
object PublishController extends Controller {
import play.api.libs.concurrent.Execution.Implicits._
import ApplicationController._
/**
* never check validation on string. just redirect strings to kafka.
*/
def publishOnly(service: String) = withHeaderAsync(parse.text) { request =>
Future {
if (!Config.IS_WRITE_SERVER) Unauthorized
val strs = request.body.split("\n")
strs.foreach(str => {
val keyedMessage = new ProducerRecord[Key, Val](Config.KAFKA_LOG_TOPIC, s"$service\t$str")
KafkaAggregatorActor.enqueue(Protocol.KafkaMessage(keyedMessage))
})
Ok("publish success.\n").withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> "timeout=10, max=10")
}
}
def publish(topic: String) = publishOnly(topic)
// def mutateBulk(topic: String) = Action.async(parse.text) { request =>
// EdgeController.mutateAndPublish(Config.KAFKA_LOG_TOPIC, Config.KAFKA_FAIL_TOPIC, request.body).map { result =>
// result.withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> "timeout=10, max=10")
// }
// }
def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request =>
EdgeController.mutateAndPublish(request.body)
}
}