/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.s2graph.rest.play.controllers

import org.apache.s2graph.core.Management
import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.rest.RequestParser
import org.apache.s2graph.core.utils.logger
import play.api.libs.functional.syntax._
import play.api.libs.json._
import play.api.mvc
import play.api.mvc.{Action, Controller}

import scala.util.{Failure, Success, Try}

object AdminController extends Controller {

  import ApplicationController._
  private val management: Management = org.apache.s2graph.rest.play.Global.storageManagement
  private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser

  /**
   * admin message formatter
   * @tparam T
   */
  trait AdminMessageFormatter[T] {
    def toJson(msg: T): JsValue
  }

  object AdminMessageFormatter {
    implicit def jsValueToJson[T <: JsValue] = new AdminMessageFormatter[T] {
      def toJson(js: T) = js
    }

    implicit val stringToJson = new AdminMessageFormatter[String] {
      def toJson(js: String) = Json.obj("message" -> js)
    }
  }

  def format[T: AdminMessageFormatter](f: JsValue => play.mvc.Result)(message: T) = {
    val formatter = implicitly[AdminMessageFormatter[T]]
    f(formatter.toJson(message))
  }

  /**
   * ok response
   * @param message
   * @tparam T
   * @return
   */
  def ok[T: AdminMessageFormatter](message: T) = {
    val formatter = implicitly[AdminMessageFormatter[T]]
    Ok(formatter.toJson(message)).as(applicationJsonHeader)
  }

  /**
   * bad request response
   * @param message
   * @tparam T
   * @return
   */
  def bad[T: AdminMessageFormatter](message: T) = {
    val formatter = implicitly[AdminMessageFormatter[T]]
    BadRequest(formatter.toJson(message)).as(applicationJsonHeader)
  }

  /**
   * not found response
   * @param message
   * @tparam T
   * @return
   */
  def notFound[T: AdminMessageFormatter](message: T) = {
    val formatter = implicitly[AdminMessageFormatter[T]]
    NotFound(formatter.toJson(message)).as(applicationJsonHeader)
  }

  private[AdminController] def tryResponse[T, R: AdminMessageFormatter](res: Try[T])(callback: T => R): mvc.Result = res match {
    case Success(m) =>
      val ret = callback(m)
      logger.info(ret.toString)
      ok(ret)
    case Failure(error) =>
      logger.error(error.getMessage, error)
      error match {
        case JsResultException(e) => bad(JsError.toJson(e))
        case _ => bad(error.getMessage)
      }
  }

  def optionResponse[T, R: AdminMessageFormatter](res: Option[T])(callback: T => R): mvc.Result = res match {
    case Some(m) => ok(callback(m))
    case None => notFound("not found")
  }

  /**
   * load all model cache
   * @return
   */
  def loadCache() = Action { request =>
    val startTs = System.currentTimeMillis()

    if (!ApplicationController.isHealthy) {
      loadCacheInner()
    }

    ok(s"${System.currentTimeMillis() - startTs}")
  }

  def loadCacheInner() = {
    Service.findAll()
    ServiceColumn.findAll()
    Label.findAll()
    LabelMeta.findAll()
    LabelIndex.findAll()
    ColumnMeta.findAll()
  }

  /**
   * read
   */

  /**
   * get service info
   * @param serviceName
   * @return
   */
  def getService(serviceName: String) = Action { request =>
    val serviceOpt = Management.findService(serviceName)
    optionResponse(serviceOpt)(_.toJson)
  }

  /**
   * get label info
   * @param labelName
   * @return
   */
  def getLabel(labelName: String) = Action { request =>
    val labelOpt = Management.findLabel(labelName)
    optionResponse(labelOpt)(_.toJson)
  }

  /**
   * get all labels of service
   * @param serviceName
   * @return
   */
  def getLabels(serviceName: String) = Action { request =>
    Service.findByName(serviceName) match {
      case None => notFound(s"Service $serviceName not found")
      case Some(service) =>
        val src = Label.findBySrcServiceId(service.id.get)
        val tgt = Label.findByTgtServiceId(service.id.get)

        ok(Json.obj("from" -> src.map(_.toJson), "to" -> tgt.map(_.toJson)))
    }
  }

  /**
   * get service columns
   * @param serviceName
   * @param columnName
   * @return
   */
  def getServiceColumn(serviceName: String, columnName: String) = Action { request =>
    val serviceColumnOpt = for {
      service <- Service.findByName(serviceName)
      serviceColumn <- ServiceColumn.find(service.id.get, columnName, useCache = false)
    } yield serviceColumn

    optionResponse(serviceColumnOpt)(_.toJson)
  }

  /**
   * create
   */

  /**
   * create service
   * @return
   */
  def createService() = Action(parse.json) { request =>
    val serviceTry = createServiceInner(request.body)
    tryResponse(serviceTry)(_.toJson)
  }


  def createServiceInner(jsValue: JsValue) = {
    val (serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) = requestParser.toServiceElements(jsValue)
    management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm)
  }

  /**
   * create label
   * @return
   */
  def createLabel() = Action(parse.json) { request =>
    val ret = createLabelInner(request.body)
    tryResponse(ret)(_.toJson)
  }


  def createLabelInner(json: JsValue) = for {
    label <- requestParser.toLabelElements(json)
  } yield label

  /**
   * add index
   * @return
   */
  def addIndex() = Action(parse.json) { request =>
    val ret = addIndexInner(request.body)
    tryResponse(ret)(_.label + " is updated")
  }

  def addIndexInner(json: JsValue) = for {
    (labelName, indices) <- requestParser.toIndexElements(json)
    label <- Management.addIndex(labelName, indices)
  } yield label

  /**
   * create service column
   * @return
   */
  def createServiceColumn() = Action(parse.json) { request =>
    val serviceColumnTry = createServiceColumnInner(request.body)
    tryResponse(serviceColumnTry) { (columns: Seq[ColumnMeta]) => Json.obj("metas" -> columns.map(_.toJson)) }
  }

  def createServiceColumnInner(jsValue: JsValue) = for {
    (serviceName, columnName, columnType, props) <- requestParser.toServiceColumnElements(jsValue)
    serviceColumn <- Management.createServiceColumn(serviceName, columnName, columnType, props)
  } yield serviceColumn

  /**
   * delete
   */

  /**
   * delete label
   * @param labelName
   * @return
   */
  def deleteLabel(labelName: String) = Action { request =>
    val deleteLabelTry = deleteLabelInner(labelName)
    tryResponse(deleteLabelTry)(labelName => labelName + " is deleted")
  }

  def deleteLabelInner(labelName: String) = Management.deleteLabel(labelName)

  /**
   * delete servieColumn
   * @param serviceName
   * @param columnName
   * @return
   */
  def deleteServiceColumn(serviceName: String, columnName: String) = Action { request =>
    val serviceColumnTry = deleteServiceColumnInner(serviceName, columnName)
    tryResponse(serviceColumnTry)(columnName => columnName + " is deleted")
  }

  def deleteServiceColumnInner(serviceName: String, columnName: String) =
    Management.deleteColumn(serviceName, columnName)

  /**
   * update
   */

  /**
   * add Prop to label
   * @param labelName
   * @return
   */
  def addProp(labelName: String) = Action(parse.json) { request =>
    val labelMetaTry = addPropInner(labelName, request.body)
    tryResponse(labelMetaTry)(_.toJson)
  }

  def addPropInner(labelName: String, js: JsValue) = for {
    prop <- requestParser.toPropElements(js)
    labelMeta <- Management.addProp(labelName, prop)
  } yield labelMeta

  /**
   * add prop to serviceColumn
   * @param serviceName
   * @param columnName
   * @return
   */
  def addServiceColumnProp(serviceName: String, columnName: String) = Action(parse.json) { request =>
    addServiceColumnPropInner(serviceName, columnName)(request.body) match {
      case None => bad(s"can`t find service with $serviceName or can`t find serviceColumn with $columnName")
      case Some(m) => Ok(m.toJson).as(applicationJsonHeader)
    }
  }

  def addServiceColumnPropInner(serviceName: String, columnName: String)(js: JsValue) = {
    for {
      service <- Service.findByName(serviceName)
      serviceColumn <- ServiceColumn.find(service.id.get, columnName)
      prop <- requestParser.toPropElements(js).toOption
    } yield {
      ColumnMeta.findOrInsert(serviceColumn.id.get, prop.name, prop.defaultValue)
    }
  }

  /**
   * add props to serviceColumn
   * @param serviecName
   * @param columnName
   * @return
   */
  def addServiceColumnProps(serviecName: String, columnName: String) = Action(parse.json) { request =>
    val jsObjs = request.body.asOpt[List[JsObject]].getOrElse(List.empty[JsObject])
    val newProps = for {
      js <- jsObjs
      newProp <- addServiceColumnPropInner(serviecName, columnName)(js)
    } yield newProp
    ok(s"${newProps.size} is added.")
  }

  /**
   * copy label
   * @param oldLabelName
   * @param newLabelName
   * @return
   */
  def copyLabel(oldLabelName: String, newLabelName: String) = Action { request =>
    val copyTry = management.copyLabel(oldLabelName, newLabelName, Some(newLabelName))
    tryResponse(copyTry)(_.label + " created")
  }

  /**
   * rename label
   * @param oldLabelName
   * @param newLabelName
   * @return
   */
  def renameLabel(oldLabelName: String, newLabelName: String) = Action { request =>
    Label.findByName(oldLabelName) match {
      case None => NotFound.as(s"Label $oldLabelName not found.")
      case Some(label) =>
        Management.updateLabelName(oldLabelName, newLabelName)
        ok(s"Label was updated.")
    }
  }

  /**
   * update HTable for a label
   * @param labelName
   * @param newHTableName
   * @return
   */
  def updateHTable(labelName: String, newHTableName: String) = Action { request =>
    val updateTry = Management.updateHTable(labelName, newHTableName)
    tryResponse(updateTry)(_.toString + " label(s) updated.")
  }

  /**
   * swap two label names
   * @param leftLabelName
   * @param rightLabelName
   * @return
   */
  def swapLabels(leftLabelName: String, rightLabelName: String) = Action { request =>
    val left = Label.findByName(leftLabelName, useCache = false)
    val right = Label.findByName(rightLabelName, useCache = false)
    // verify same schema

    (left, right) match {
      case (Some(l), Some(r)) =>
        Management.swapLabelNames(leftLabelName, rightLabelName)
        ok(s"Labels were swapped.")
      case _ => notFound(s"Labels ${leftLabelName} or ${rightLabelName} not found.")
    }
  }

  case class HTableParams(cluster: String, hTableName: String,
    preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: Option[String]) {

    override def toString(): String = {
      s"""HtableParams
         |-- cluster : $cluster
         |-- hTableName : $hTableName
         |-- preSplitSize : $preSplitSize
         |-- hTableTTL : $hTableTTL
         |-- compressionAlgorithm : $compressionAlgorithm
         |""".stripMargin
    }
  }

  implicit object HTableParamsJsonConverter extends Format[HTableParams] {
    def reads(json: JsValue): JsResult[HTableParams] = (
    (__ \ "cluster").read[String] and
    (__ \ "hTableName").read[String] and
    (__ \ "preSplitSize").read[Int] and
    (__ \ "hTableTTL").readNullable[Int] and
      (__ \ "compressionAlgorithm").readNullable[String])(HTableParams.apply _).reads(json)

    def writes(o: HTableParams): JsValue = Json.obj(
      "cluster" -> o.cluster,
      "hTableName" -> o.hTableName,
      "preSplitSize" -> o.preSplitSize,
      "hTableTTL" -> o.hTableTTL,
      "compressionAlgorithm" -> o.compressionAlgorithm
    )
  }

  implicit object JsErrorJsonWriter extends Writes[JsError] {
    def writes(o: JsError): JsValue = Json.obj(
      "errors" -> JsArray(
        o.errors.map {
          case (path, validationErrors) => Json.obj(
            "path" -> Json.toJson(path.toString()),
            "validationErrors" -> JsArray(validationErrors.map(validationError => Json.obj(
              "message" -> JsString(validationError.message),
              "args" -> JsArray(validationError.args.map(_ match {
                case x: Int => JsNumber(x)
                case x => JsString(x.toString)
              }))
            )))
          )
        }
      )
    )
  }

  def createHTable() = Action { request =>

    //    Management.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm)
    request.body.asJson.map(_.validate[HTableParams] match {
      case JsSuccess(hTableParams, _) => {
        management.createStorageTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"),
          hTableParams.preSplitSize, hTableParams.hTableTTL,
          hTableParams.compressionAlgorithm.getOrElse(Management.DefaultCompressionAlgorithm))
        logger.info(hTableParams.toString())
        ok(s"HTable was created.")
      }
      case err@JsError(_) => bad(Json.toJson(err))
    }).getOrElse(bad("Invalid Json."))

  }
}
