package com.daumkakao.s2graph.core

import HBaseElement._
import play.api.libs.json._
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.HTableDescriptor
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.client.Durability
import org.apache.hadoop.hbase.io.compress.Compression
import org.apache.hadoop.hbase.regionserver.BloomType
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding

/**
 * orchestrate kafka queue, db meta.
 */
object Management extends JSONParser {

  val hardLimit = 10000
  val defaultLimit = 100

  //  val labels = new ConcurrentHashMap[String, Label]
  /**
   * source, target, label, orderBy, extra properties
   * e	src	tgt	label	key:value,key:value...	key:value,key:value...
   * v src, key:value,key:value
   *
   * talk_friend 10 -> story:friend 10 ->
   */
  def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = {
    for {
      old <- Label.findByName(oldLabelName)
    } {
      Label.findByName(newLabelName) match {
        case None =>
          val (indexProps, metaProps) = old.metaPropsInvMap.partition(nameMeta => nameMeta._2.usedInIndex)

          createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType,
            old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
            old.isDirected, old.serviceName,
            indexProps.map(t => t._1 -> innerValToJsValue(t._2.defaultInnerVal)).toSeq,
            metaProps.map(t => t._1 -> innerValToJsValue(t._2.defaultInnerVal)).toSeq,
            old.consistencyLevel, hTableName, old.hTableTTL)
        case Some(_) =>
      }
    }
  }
  /**
   * label
   */
  /**
   * create {labelName, labelName_delete, labelName_update} labels
   */
  def createLabel(label: String,
    srcServiceName: String,
    srcColumnName: String,
    srcColumnType: String,
    tgtServiceName: String,
    tgtColumnName: String,
    tgtColumType: String,
    isDirected: Boolean = true,
    serviceName: String,
    indexProps: Seq[(String, JsValue)],
    props: Seq[(String, JsValue)],
    consistencyLevel: String,
    hTableName: Option[String],
    hTableTTL: Option[Int]): Label = {

    val idxProps = for ((k, v) <- indexProps; (innerVal, dataType) = toInnerVal(v)) yield (k, innerVal, dataType, true)
    val metaProps = for ((k, v) <- props; (innerVal, dataType) = toInnerVal(v)) yield (k, innerVal, dataType, false)

    val indexPropsWithType =
      for ((k, v) <- indexProps) yield {
        val (innerVal, dataType) = toInnerVal(v)
        (k, innerVal, dataType)
      }
    var cacheKey = s"serviceName=$srcServiceName"
    Service.expireCache(cacheKey)
    val srcService = tryOption(srcServiceName, Service.findByName)
    cacheKey = s"serviceName=$tgtServiceName"
    Service.expireCache(cacheKey)
    val tgtService = tryOption(tgtServiceName, Service.findByName)
    cacheKey = s"serviceName=$serviceName"
    Service.expireCache(cacheKey)
    val service = tryOption(serviceName, Service.findByName)

    cacheKey = s"label=$label"
    Label.expireCache(cacheKey)
    val labelOpt = Label.findByName(label, useCache = false)

    labelOpt match {
      case Some(l) =>
        throw new KGraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.")
      case None =>
        Label.insertAll(label,
          srcService.id.get, srcColumnName, srcColumnType,
          tgtService.id.get, tgtColumnName, tgtColumType,
          isDirected, serviceName, service.id.get, idxProps ++ metaProps, consistencyLevel, hTableName, hTableTTL)
        Label.expireCache(cacheKey)
        Label.findByName(label).get
    }
  }

  def addIndex(labelStr: String, orderByKeys: Seq[(String, JsValue)]) = {
    val label = try {
      Label.findByName(labelStr).get
    } catch {
      case e: Throwable =>
        throw new KGraphExceptions.LabelNotExistException(labelStr)
    }
    val labelOrderTypes =
      for ((k, v) <- orderByKeys; (innerVal, dataType) = toInnerVal(v)) yield {

        val lblMeta = LabelMeta.findOrInsert(label.id.get, k, innerVal.toString, dataType, true)
        if (lblMeta.usedInIndex) lblMeta.seq
        else throw new KGraphExceptions.LabelMetaExistException(s"")
      }
    LabelIndex.findOrInsert(label.id.get, labelOrderTypes.toList, "")
  }

  def getServiceLable(label: String): Option[Label] = {
    Label.findByName(label)
  }

  /**
   *
   */

  def toLabelWithDirectionAndOp(label: Label, direction: String): Option[LabelWithDirection] = {
    for {
      labelId <- label.id
      dir = GraphUtil.toDirection(direction)
    } yield LabelWithDirection(labelId, dir)
  }

  def tryOption[A, R](key: A, f: A => Option[R]) = {
    f(key) match {
      case None => throw new KGraphExceptions.InternalException(s"$key is not found in DB. create $key first.")
      case Some(r) => r
    }
  }

  def toEdge(ts: Long, operation: String, srcId: String, tgtId: String,
    labelStr: String, direction: String = "", props: String): Edge = {

    val label = tryOption(labelStr, getServiceLable)

    val src = toInnerVal(srcId, label.srcColumnType)
    val tgt = toInnerVal(tgtId, label.tgtColumnType)

    val srcVertex = Vertex(CompositeId(label.srcColumn.id.get, src, true, true), ts)
    val tgtVertex = Vertex(CompositeId(label.tgtColumn.id.get, tgt, true, true), ts)
    val dir = if (direction == "") GraphUtil.toDirection(label.direction) else GraphUtil.toDirection(direction)
    val labelWithDir = LabelWithDirection(label.id.get, dir)
    val op = tryOption(operation, GraphUtil.toOp)

    val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
    val parsedProps = toProps(label, jsObject).toMap
    val propsWithTs = parsedProps.map(kv => (kv._1 -> InnerValWithTs(kv._2, ts))) ++ Map(LabelMeta.timeStampSeq -> InnerValWithTs(InnerVal.withLong(ts), ts))
    Edge(srcVertex, tgtVertex, labelWithDir, op, ts, version = ts, propsWithTs)

  }

  def toVertex(ts: Long, operation: String, id: String, serviceName: String, columnName: String, props: String): Vertex = {
    Service.findByName(serviceName) match {
      case None => throw new RuntimeException(s"$serviceName does not exist. create service first.")
      case Some(service) =>
        ServiceColumn.find(service.id.get, columnName) match {
          case None => throw new RuntimeException(s"$columnName is not exist. create service column first.")
          case Some(col) =>
            val idVal = toInnerVal(id, col.columnType)
            val op = tryOption(operation, GraphUtil.toOp)
            val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
            val parsedProps = toProps(col, jsObject).toMap
            Vertex(CompositeId(col.id.get, idVal, isEdge = false, useHash = true), ts, parsedProps, op = op)
        }
    }
  }

  def toProps(column: ServiceColumn, js: JsObject): Seq[(Byte, InnerVal)] = {

    val props = for {
      (k, v) <- js.fields
    } yield {
      val colMeta = ColumnMeta.findOrInsert(column.id.get, k)
      val (innerVal, dataType) = toInnerVal(v)
      (colMeta.seq, innerVal)
    }
    //    Logger.debug(s"vertex.ToProps: $column, $js => $props")
    props

  }

  def toProps(label: Label, js: JsObject): Seq[(Byte, InnerVal)] = {

    val props = for {
      (k, v) <- js.fields
      meta <- label.metaPropsInvMap.get(k)
      //        meta <- LabelMeta.findByName(label.id.get, k)
      //      meta = tryOption((label.id.get, k), LabelMeta.findByName)
      innerVal <- jsValueToInnerVal(v, meta.dataType)
    } yield {
      (meta.seq, innerVal)
    }
    //    Logger.error(s"toProps: $js => $props")
    props

  }

  val idTableName = "id"
  val cf = "a"
  val idColName = "id"
  val regionCnt = 10

  def getAdmin(zkAddr: String) = {
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", zkAddr)

    val adm = new HBaseAdmin(conf)
    adm
  }
  def enableTable(zkAddr: String, tableName: String) = {
    getAdmin(zkAddr).enableTable(tableName)
  }
  def disableTable(zkAddr: String, tableName: String) = {
    getAdmin(zkAddr).disableTable(tableName)
  }
  def dropTable(zkAddr: String, tableName: String) = {
    getAdmin(zkAddr).disableTable(tableName)
    getAdmin(zkAddr).deleteTable(tableName)
  }
//  def deleteEdgesByLabelIds(zkAddr: String,
//    tableName: String,
//    labelIds: String = "",
//    minTs: Long = 0L,
//    maxTs: Long = Long.MaxValue,
//    include: Boolean = true) = {
//    val conf = HBaseConfiguration.create()
//    val longTimeout = "1200000"
//    conf.set("hbase.rpc.timeout", longTimeout)
//    conf.set("hbase.client.operation.timeout", longTimeout)
//    conf.set("hbase.client.scanner.timeout.period", longTimeout)
//    conf.set("hbase.zookeeper.quorum", zkAddr)
//    val conn = HConnectionManager.createConnection(conf)
//    val table = conn.getTable(tableName.getBytes)
//    var builder = DeleteLabelsArgument.newBuilder()
//    val scanner = Scan.newBuilder()
//
//    scanner.setTimeRange(TimeRange.newBuilder().setFrom(minTs).setTo(maxTs))
//    /**
//     *  when we clean up all data does not match current database ids
//     *  we will delete row completely
//     */
//    if (!include) scanner.setFilter(ProtobufUtil.toFilter(new FirstKeyOnlyFilter))
//
//    builder.setScan(scanner)
//    for (id <- labelIds.split(",")) {
//      builder.addId(id.toInt)
//    }
//
//    val argument = builder.build()
//
//    val regionStats = table.coprocessorService(classOf[GraphStatService], null, null,
//      new Batch.Call[GraphStatService, Long]() {
//        override def call(counter: GraphStatService): Long = {
//          val controller: ServerRpcController = new ServerRpcController()
//          val rpcCallback: BlockingRpcCallback[CountResponse] = new BlockingRpcCallback[CountResponse]()
//
//          if (include) {
//            counter.cleanUpDeleteLabelsRows(controller, argument, rpcCallback)
//          } else {
//            counter.cleanUpDeleteLabelsRowsExclude(controller, argument, rpcCallback)
//          }
//
//          val response: CountResponse = rpcCallback.get()
//          if (controller.failedOnException()) throw controller.getFailedOn()
//          if (response != null && response.hasCount()) {
//            response.getCount()
//          } else {
//            0L
//          }
//        }
//      })
//
//    //    regionStats.map(kv => Bytes.toString(kv._1) -> kv._2) ++ Map("total" -> regionStats.values().sum)
//  }
  def createTable(zkAddr: String, tableName: String, cfs: List[String], regionCnt: Int, ttl: Option[Int]) = {
    try {
      val admin = getAdmin(zkAddr)
      println(admin)
      if (!admin.tableExists(tableName)) {
        println("createTable")
        val desc = new HTableDescriptor(TableName.valueOf(tableName))
        desc.setDurability(Durability.ASYNC_WAL)
        for (cf <- cfs) {
          val columnDesc = new HColumnDescriptor(cf)
            .setCompressionType(Compression.Algorithm.GZ)
            .setBloomFilterType(BloomType.ROW)
            .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
            .setMaxVersions(1)
            .setTimeToLive(2147483647)
            .setMinVersions(0)
            .setBlocksize(32768)
            .setBlockCacheEnabled(true)
          if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
          desc.addFamily(columnDesc)
        }

        if (regionCnt <= 1) admin.createTable(desc)
        else admin.createTable(desc, getStartKey(regionCnt), getEndKey(regionCnt), regionCnt)
      } else {
        // already exist
      }
    } catch {
      case e: Throwable => println(e)
    }
  }
  // we only use murmur hash to distribute row key.
  private def getStartKey(regionCount: Int) = {
    Bytes.toBytes((Int.MaxValue / regionCount))
  }

  private def getEndKey(regionCount: Int) = {
    Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1)))
  }


}