/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.s2graph.rest.play.controllers

import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.s2graph.core.ExceptionHandler
import org.apache.s2graph.core.ExceptionHandler.KafkaMessage
import org.apache.s2graph.core.mysqls.Label
import org.apache.s2graph.counter
import org.apache.s2graph.counter.config.S2CounterConfig
import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit
import org.apache.s2graph.counter.core._
import org.apache.s2graph.counter.core.v1.{RankingStorageRedis, ExactStorageAsyncHBase}
import org.apache.s2graph.counter.core.v2.{RankingStorageGraph, ExactStorageGraph}
import org.apache.s2graph.counter.models.Counter.ItemType
import org.apache.s2graph.counter.models.{Counter, CounterModel}
import org.apache.s2graph.counter.util.{ReduceMapValue, CartesianProduct, UnitConverter}
import org.apache.s2graph.rest.play.config.CounterConfig
import org.apache.s2graph.rest.play.models._
import play.api.Play
import play.api.libs.json.Reads._
import play.api.libs.json._
import play.api.mvc.{Action, Controller, Request}
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}

object CounterController extends Controller {
  import play.api.libs.concurrent.Execution.Implicits.defaultContext

  val config = Play.current.configuration.underlying
  val s2config = new S2CounterConfig(config)

  private lazy val walLogHandler: ExceptionHandler = org.apache.s2graph.rest.play.Global.wallLogHandler
  private val exactCounterMap = Map(
    counter.VERSION_1 -> new ExactCounter(config, new ExactStorageAsyncHBase(config)),
    counter.VERSION_2 -> new ExactCounter(config, new ExactStorageGraph(config))
  )
  private val rankingCounterMap = Map(
    counter.VERSION_1 -> new RankingCounter(config, new RankingStorageRedis(config)),
    counter.VERSION_2 -> new RankingCounter(config, new RankingStorageGraph(config))
  )

  private val tablePrefixMap = Map (
    counter.VERSION_1 -> "s2counter",
    counter.VERSION_2 -> "s2counter_v2"
  )

  private def exactCounter(version: Byte): ExactCounter = exactCounterMap(version)
  private def rankingCounter(version: Byte): RankingCounter = rankingCounterMap(version)

  lazy val counterModel = new CounterModel(config)

  def getQueryString[T](key: String, default: String)(implicit request: Request[T]): String = {
    request.getQueryString(key).getOrElse(default)
  }

  implicit val counterWrites = new Writes[Counter] {
    override def writes(o: Counter): JsValue = Json.obj(
      "version" -> o.version.toInt,
      "autoComb" -> o.autoComb,
      "dimension" -> o.dimension,
      "useProfile" -> o.useProfile,
      "bucketImpId" -> o.bucketImpId,
      "useRank" -> o.useRank,
      "intervalUnit" -> o.intervalUnit,
      "ttl" -> o.ttl,
      "dailyTtl" -> o.dailyTtl,
      "rateAction" -> o.rateActionId.flatMap { actionId =>
        counterModel.findById(actionId, useCache = false).map { actionPolicy =>
          Json.obj("service" -> actionPolicy.service, "action" -> actionPolicy.action)
        }
      },
      "rateBase" -> o.rateBaseId.flatMap { baseId =>
        counterModel.findById(baseId, useCache = false).map { basePolicy =>
          Json.obj("service" -> basePolicy.service, "action" -> basePolicy.action)
        }
      },
      "rateThreshold" -> o.rateThreshold
    )
  }

  def createAction(service: String, action: String) = Action(s2parse.json) { implicit request =>
    counterModel.findByServiceAction(service, action, useCache = false) match {
      case None =>
        val body = request.body
        val version = (body \ "version").asOpt[Int].map(_.toByte).getOrElse(counter.VERSION_2)
        val autoComb = (body \ "autoComb").asOpt[Boolean].getOrElse(true)
        val dimension = (body \ "dimension").asOpt[String].getOrElse("")
        val useProfile = (body \ "useProfile").asOpt[Boolean].getOrElse(false)
        val bucketImpId = (body \ "bucketImpId").asOpt[String]

        val useExact = (body \ "useExact").asOpt[Boolean].getOrElse(true)
        val useRank = (body \ "useRank").asOpt[Boolean].getOrElse(true)

        val intervalUnit = (body \ "intervalUnit").asOpt[String]
        // 2 day
        val ttl = (body \ "ttl").asOpt[Int].getOrElse(2 * 24 * 60 * 60)
        val dailyTtl = (body \ "dailyTtl").asOpt[Int]
        val regionMultiplier = (body \ "regionMultiplier").asOpt[Int].getOrElse(1)

        val rateAction = (body \ "rateAction").asOpt[Map[String, String]]
        val rateBase = (body \ "rateBase").asOpt[Map[String, String]]
        val rateThreshold = (body \ "rateThreshold").asOpt[Int]

        val rateActionId = {
          for {
            actionMap <- rateAction
            service <- actionMap.get("service")
            action <- actionMap.get("action")
            policy <- counterModel.findByServiceAction(service, action)
          } yield {
            policy.id
          }
        }
        val rateBaseId = {
          for {
            actionMap <- rateBase
            service <- actionMap.get("service")
            action <- actionMap.get("action")
            policy <- counterModel.findByServiceAction(service, action)
          } yield {
            policy.id
          }
        }

        val hbaseTable = {
          Seq(tablePrefixMap(version), service, ttl) ++ dailyTtl mkString "_"
        }

        // find label
        val itemType = Label.findByName(action, useCache = false) match {
          case Some(label) =>
            ItemType.withName(label.tgtColumnType.toUpperCase)
          case None =>
            val strItemType = (body \ "itemType").asOpt[String].getOrElse("STRING")
            ItemType.withName(strItemType.toUpperCase)
        }
        val policy = Counter(useFlag = true, version, service, action, itemType, autoComb = autoComb, dimension,
          useProfile = useProfile, bucketImpId, useRank = useRank, ttl, dailyTtl, Some(hbaseTable), intervalUnit,
          rateActionId, rateBaseId, rateThreshold)

        if (rateAction.isEmpty) {
          // prepare exact storage
          exactCounter(version).prepare(policy)
        }
        if (useRank || rateAction.isDefined) {
          // prepare ranking storage
          rankingCounter(version).prepare(policy)
        }
        counterModel.createServiceAction(policy)
        Ok(Json.toJson(Map("msg" -> s"created $service/$action")))
      case Some(policy) =>
        Ok(Json.toJson(Map("msg" -> s"already exist $service/$action")))
    }
  }

  def getAction(service: String, action: String) = Action { request =>
    counterModel.findByServiceAction(service, action, useCache = false) match {
      case Some(policy) =>
        Ok(Json.toJson(policy))
      case None =>
        NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))
    }
  }

  def updateAction(service: String, action: String) = Action(s2parse.json) { request =>
    counterModel.findByServiceAction(service, action, useCache = false) match {
      case Some(oldPolicy) =>
        val body = request.body
        val autoComb = (body \ "autoComb").asOpt[Boolean].getOrElse(oldPolicy.autoComb)
        val dimension = (body \ "dimension").asOpt[String].getOrElse(oldPolicy.dimension)
        val useProfile = (body \ "useProfile").asOpt[Boolean].getOrElse(oldPolicy.useProfile)
        val bucketImpId = (body \ "bucketImpId").asOpt[String] match {
          case Some(s) => Some(s)
          case None => oldPolicy.bucketImpId
        }

        val useRank = (body \ "useRank").asOpt[Boolean].getOrElse(oldPolicy.useRank)

        val intervalUnit = (body \ "intervalUnit").asOpt[String] match {
          case Some(s) => Some(s)
          case None => oldPolicy.intervalUnit
        }

        val rateAction = (body \ "rateAction").asOpt[Map[String, String]]
        val rateBase = (body \ "rateBase").asOpt[Map[String, String]]
        val rateThreshold = (body \ "rateThreshold").asOpt[Int] match {
          case Some(i) => Some(i)
          case None => oldPolicy.rateThreshold
        }

        val rateActionId = {
          for {
            actionMap <- rateAction
            service <- actionMap.get("service")
            action <- actionMap.get("action")
            policy <- counterModel.findByServiceAction(service, action, useCache = false)
          } yield {
            policy.id
          }
        } match {
          case Some(i) => Some(i)
          case None => oldPolicy.rateActionId
        }
        val rateBaseId = {
          for {
            actionMap <- rateBase
            service <- actionMap.get("service")
            action <- actionMap.get("action")
            policy <- counterModel.findByServiceAction(service, action, useCache = false)
          } yield {
            policy.id
          }
        } match {
          case Some(i) => Some(i)
          case None => oldPolicy.rateBaseId
        }

        // new counter
        val policy = Counter(id = oldPolicy.id, useFlag = oldPolicy.useFlag, oldPolicy.version, service, action, oldPolicy.itemType, autoComb = autoComb, dimension,
          useProfile = useProfile, bucketImpId, useRank = useRank, oldPolicy.ttl, oldPolicy.dailyTtl, oldPolicy.hbaseTable, intervalUnit,
          rateActionId, rateBaseId, rateThreshold)

        counterModel.updateServiceAction(policy)
        Ok(Json.toJson(Map("msg" -> s"updated $service/$action")))
      case None =>
        NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))
    }
  }

  def prepareAction(service: String, action: String) = Action(s2parse.json) { request =>
    // for data migration
    counterModel.findByServiceAction(service, action, useCache = false) match {
      case Some(policy) =>
        val body = request.body
        val version = (body \ "version").as[Int].toByte
        if (version != policy.version) {
          // change table name
          val newTableName = Seq(tablePrefixMap(version), service, policy.ttl) ++ policy.dailyTtl mkString "_"
          val newPolicy = policy.copy(version = version, hbaseTable = Some(newTableName))

          if (newPolicy.rateActionId.isEmpty) {
            exactCounter(version).prepare(newPolicy)
          }
          if (newPolicy.useRank || newPolicy.rateActionId.isDefined) {
            rankingCounter(version).prepare(newPolicy)
          }
          Ok(Json.toJson(Map("msg" -> s"prepare storage v$version $service/$action")))
        } else {
          Ok(Json.toJson(Map("msg" -> s"already prepared storage v$version $service/$action")))
        }
      case None =>
        NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))
    }
  }

  def deleteAction(service: String, action: String) = Action.apply {
    {
      for {
        policy <- counterModel.findByServiceAction(service, action, useCache = false)
      } yield {
        Try {
          if (policy.rateActionId.isEmpty) {
            exactCounter(policy.version).destroy(policy)
          }
          if (policy.useRank || policy.rateActionId.isDefined) {
            rankingCounter(policy.version).destroy(policy)
          }
          counterModel.deleteServiceAction(policy)
        } match {
          case Success(v) =>
            Ok(Json.toJson(Map("msg" -> s"deleted $service/$action")))
          case Failure(ex) =>
            throw ex
        }
      }
    }.getOrElse(NotFound(Json.toJson(Map("msg" -> s"$service.$action not found"))))
  }

  def getExactCountAsync(service: String, action: String, item: String) = Action.async { implicit request =>
    val intervalUnits = getQueryString("interval", getQueryString("step", "t")).split(',').toSeq
      .map(IntervalUnit.withName)
    val limit = getQueryString("limit", "1").toInt

    val qsSum = request.getQueryString("sum")

    val optFrom = request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis)
    val optTo = request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis)

    val limitOpt = (optFrom, optTo) match {
      case (Some(_), Some(_)) =>
        None
      case _ =>
        Some(limit)
    }

    // find dimension
    lazy val dimQueryValues = request.queryString.filterKeys { k => k.charAt(0) == ':' }.map { case (k, v) =>
      (k.substring(1), v.mkString(",").split(',').filter(_.nonEmpty).toSet)
    }
//    Logger.warn(s"$dimQueryValues")

    counterModel.findByServiceAction(service, action) match {
      case Some(policy) =>
        val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, optFrom, optTo)
        val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, optFrom, optTo)
        try {
//          Logger.warn(s"$tqs $qsSum")
          if (tqs.head.length > 1 && qsSum.isDefined) {
            getDecayedCountToJs(policy, item, timeRange, dimQueryValues, qsSum).map { jsVal =>
              Ok(jsVal)
            }
          } else {
            getExactCountToJs(policy, item, timeRange, limitOpt, dimQueryValues).map { jsVal =>
              Ok(jsVal)
            }
          }
        } catch {
          case e: Exception =>
            throw e
//            Future.successful(BadRequest(s"$service, $action, $item"))
        }
      case None =>
        Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action not found"))))
    }
  }

  /*
   * [{
   *    "service": , "action", "itemIds": [], "interval": string, "limit": int, "from": ts, "to": ts,
   *    "dimensions": [{"key": list[String]}]
   * }]
   * @return
   */
  private def parseExactCountParam(jsValue: JsValue) = {
    val service = (jsValue \ "service").as[String]
    val action = (jsValue \ "action").as[String]
    val itemIds = (jsValue \ "itemIds").as[Seq[String]]
    val intervals = (jsValue \ "intervals").asOpt[Seq[String]].getOrElse(Seq("t")).distinct.map(IntervalUnit.withName)
    val limit = (jsValue \ "limit").asOpt[Int].getOrElse(1)
    val from = (jsValue \ "from").asOpt[Long]
    val to = (jsValue \ "to").asOpt[Long]
    val sum = (jsValue \ "sum").asOpt[String]
    val dimensions = {
      for {
        dimension <- (jsValue \ "dimensions").asOpt[Seq[JsObject]].getOrElse(Nil)
        (k, vs) <- dimension.fields
      } yield {
        k -> vs.as[Seq[String]].toSet
      }
    }.toMap
    (service, action, itemIds, intervals, limit, from, to, dimensions, sum)
  }

  def getExactCountAsyncMulti = Action.async(s2parse.json) { implicit request =>
    val jsValue = request.body
    try {
      val futures = {
        for {
          jsObject <- jsValue.asOpt[List[JsObject]].getOrElse(Nil)
          (service, action, itemIds, intervalUnits, limit, from, to, dimQueryValues, qsSum) = parseExactCountParam(jsObject)
          optFrom = from.map(UnitConverter.toMillis)
          optTo = to.map(UnitConverter.toMillis)
          timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, optFrom, optTo)
          policy <- counterModel.findByServiceAction(service, action).toSeq
          item <- itemIds
        } yield {
          val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, optFrom, optTo)
          val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, optFrom, optTo)
          val limitOpt = (optFrom, optTo) match {
            case (Some(_), Some(_)) =>
              None
            case _ =>
              Some(limit)
          }

//          Logger.warn(s"$item, $limit, $optFrom, $optTo, $qsSum, $tqs")

          if (tqs.head.length > 1 && qsSum.isDefined) {
            getDecayedCountToJs(policy, item, timeRange, dimQueryValues, qsSum)
          } else {
            getExactCountToJs(policy, item, timeRange, limitOpt, dimQueryValues)
          }
        }
      }
      Future.sequence(futures).map { rets =>
        Ok(Json.toJson(rets))
      }
    } catch {
      case e: Exception =>
        throw e
//        Future.successful(BadRequest(s"$jsValue"))
    }
  }

  private [controllers] def fetchedToResult(fetchedCounts: FetchedCountsGrouped, limitOpt: Option[Int]): Seq[ExactCounterIntervalItem] = {
    for {
      ((interval, dimKeyValues), values) <- fetchedCounts.intervalWithCountMap
    } yield {
      val counterItems = {
        val sortedItems = values.toSeq.sortBy { case (eq, v) => -eq.tq.ts }
        val limited = limitOpt match {
          case Some(limit) => sortedItems.take(limit)
          case None => sortedItems
        }
        for {
          (eq, value) <- limited
        } yield {
          ExactCounterItem(eq.tq.ts, value, value.toDouble)
        }
      }
      ExactCounterIntervalItem(interval.toString, dimKeyValues, counterItems)
    }
  }.toSeq

  private def decayedToResult(decayedCounts: DecayedCounts): Seq[ExactCounterIntervalItem] = {
    for {
      (eq, score) <- decayedCounts.qualifierWithCountMap
    } yield {
      ExactCounterIntervalItem(eq.tq.q.toString, eq.dimKeyValues, Seq(ExactCounterItem(eq.tq.ts, score.toLong, score)))
    }
  }.toSeq

  private def getExactCountToJs(policy: Counter,
                                item: String,
                                timeRange: Seq[(TimedQualifier, TimedQualifier)],
                                limitOpt: Option[Int],
                                dimQueryValues: Map[String, Set[String]]): Future[JsValue] = {
    exactCounter(policy.version).getCountsAsync(policy, Seq(item), timeRange, dimQueryValues).map { seq =>
      val items = {
        for {
          fetched <- seq
        } yield {
          fetchedToResult(fetched, limitOpt)
        }
      }.flatten
      Json.toJson(ExactCounterResult(ExactCounterResultMeta(policy.service, policy.action, item), items))
    }
  }

  private def getDecayedCountToJs(policy: Counter,
                                  item: String,
                                  timeRange: Seq[(TimedQualifier, TimedQualifier)],
                                  dimQueryValues: Map[String, Set[String]],
                                  qsSum: Option[String]): Future[JsValue] = {
    exactCounter(policy.version).getDecayedCountsAsync(policy, Seq(item), timeRange, dimQueryValues, qsSum).map { seq =>
      val decayedCounts = seq.head
      val meta = ExactCounterResultMeta(policy.service, policy.action, decayedCounts.exactKey.itemKey)
      val intervalItems = decayedToResult(decayedCounts)
      Json.toJson(ExactCounterResult(meta, intervalItems))
    }
  }

  def getRankingCountAsync(service: String, action: String) = Action.async { implicit request =>
    lazy val intervalUnits = getQueryString("interval", getQueryString("step", "t")).split(',').toSeq
      .map(IntervalUnit.withName)
    lazy val limit = getQueryString("limit", "1").toInt
    lazy val kValue = getQueryString("k", "10").toInt

    lazy val qsSum = request.getQueryString("sum")

    lazy val optFrom = request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis)
    lazy val optTo = request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis)

    // find dimension
    lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) == ':' }.map { case (k, v) =>
      (k.substring(1), v.mkString(",").split(',').toList)
    }

    val dimensions = {
      for {
        values <- CartesianProduct(dimensionMap.values.toList).toSeq
      } yield {
        dimensionMap.keys.zip(values).toMap
      }
    }

    counterModel.findByServiceAction(service, action) match {
      case Some(policy) =>
        val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, optTo)
        val dimKeys = {
          for {
            dimension <- dimensions
          } yield {
            dimension -> tqs.map(tq => RankingKey(policy.id, policy.version, ExactQualifier(tq, dimension)))
          }
        }

        // if tqs has only 1 tq, do not apply sum function
        try {
          val rankResult = {
            if (tqs.length > 1 && qsSum.isDefined) {
              getSumRankCounterResultAsync(policy, dimKeys, kValue, qsSum)
            } else {
              // no summary
              Future.successful(getRankCounterResult(policy, dimKeys, kValue))
            }
          }

          rankResult.map { result =>
            Ok(Json.toJson(result))
          }
        } catch {
          case e: UnsupportedOperationException =>
            Future.successful(NotImplemented(Json.toJson(
              Map("msg" -> e.getMessage)
            )))
          case e: Throwable =>
            throw e
        }
      case None =>
        Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action not found"))))
    }
  }

  def deleteRankingCount(service: String, action: String) = Action.async { implicit request =>
    lazy val intervalUnits = getQueryString("interval", getQueryString("step", "t")).split(',').toSeq
      .map(IntervalUnit.withName)
    lazy val limit = getQueryString("limit", "1").toInt

    // find dimension
    lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) == ':' }.map { case (k, v) =>
      (k.substring(1), v.mkString(",").split(',').toList)
    }

    val dimensions = {
      for {
        values <- CartesianProduct(dimensionMap.values.toList).toSeq
      } yield {
        dimensionMap.keys.zip(values).toMap
      }
    }

    Future {
      counterModel.findByServiceAction(service, action) match {
        case Some(policy) =>
          val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit)
          val keys = {
            for {
              dimension <- dimensions
              tq <- tqs
            } yield {
              RankingKey(policy.id, policy.version, ExactQualifier(tq, dimension))
            }
          }

          for {
            key <- keys
          } {
            rankingCounter(policy.version).delete(key)
          }

          Ok(JsObject(
            Seq(
              ("msg", Json.toJson(s"delete ranking in $service.$action")),
              ("items", Json.toJson({
                for {
                  key <- keys
                } yield {
                  s"${key.eq.tq.q}.${key.eq.tq.ts}.${key.eq.dimension}"
                }
              }))
            )
          ))
        case None =>
          NotFound(Json.toJson(
            Map("msg" -> s"$service.$action not found")
          ))
      }
    }
  }

  val reduceRateRankingValue = new ReduceMapValue[ExactKeyTrait, RateRankingValue](RateRankingValue.reduce, RateRankingValue(-1, -1))

  // change format
  private def getDecayedCountsAsync(policy: Counter,
                                    items: Seq[String],
                                    timeRange: (TimedQualifier, TimedQualifier),
                                    dimension: Map[String, String],
                                    qsSum: Option[String]): Future[Seq[(ExactKeyTrait, Double)]] = {
    exactCounter(policy.version).getDecayedCountsAsync(policy, items, Seq(timeRange), dimension.mapValues(s => Set(s)), qsSum).map { seq =>
      for {
        DecayedCounts(exactKey, qcMap) <- seq
        value <- qcMap.values
      } yield {
        exactKey -> value
      }
    }
  }

  def getSumRankCounterResultAsync(policy: Counter,
                                   dimKeys: Seq[(Map[String, String], Seq[RankingKey])],
                                   kValue: Int,
                                   qsSum: Option[String]): Future[RankCounterResult] = {
    val futures = {
      for {
        (dimension, keys) <- dimKeys
      } yield {
        val tqs = keys.map(rk => rk.eq.tq)
        val (tqFrom, tqTo) = (tqs.last, tqs.head)
        val items = rankingCounter(policy.version).getAllItems(keys, kValue)
//        Logger.warn(s"item count: ${items.length}")
        val future = {
          if (policy.isRateCounter) {
            val actionPolicy = policy.rateActionId.flatMap(counterModel.findById(_)).get
            val basePolicy = policy.rateBaseId.flatMap(counterModel.findById(_)).get

            val futureAction = getDecayedCountsAsync(actionPolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq =>
              seq.map { case (k, score) =>
                ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(score, -1)
              }.toMap
            }
            val futureBase = getDecayedCountsAsync(basePolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq =>
              seq.map { case (k, score) =>
                ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(-1, score)
              }.toMap
            }
            futureAction.zip(futureBase).map { case (actionScores, baseScores) =>
              reduceRateRankingValue(actionScores, baseScores).map { case (k, rrv) =>
//                Logger.warn(s"$k -> $rrv")
                k -> rrv.rankingValue.score
              }.toSeq
            }
          }
          else if (policy.isTrendCounter) {
            val actionPolicy = policy.rateActionId.flatMap(counterModel.findById(_)).get
            val basePolicy = policy.rateBaseId.flatMap(counterModel.findById(_)).get

            val futureAction = getDecayedCountsAsync(actionPolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq =>
              seq.map { case (k, score) =>
                ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(score, -1)
              }.toMap
            }
            val futureBase = getDecayedCountsAsync(basePolicy, items, (tqFrom.add(-1), tqTo.add(-1)), dimension, qsSum).map { seq =>
              seq.map { case (k, score) =>
                ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(-1, score)
              }.toMap
            }
            futureAction.zip(futureBase).map { case (actionScores, baseScores) =>
              reduceRateRankingValue(actionScores, baseScores).map { case (k, rrv) =>
//                Logger.warn(s"$k -> $rrv")
                k -> rrv.rankingValue.score
              }.toSeq
            }
          }
          else {
            getDecayedCountsAsync(policy, items, (tqFrom, tqTo), dimension, qsSum)
          }
        }
        future.map { keyWithScore =>
          val ranking = keyWithScore.sortBy(-_._2).take(kValue)
          val rankCounterItems = {
            for {
              idx <- ranking.indices
              (exactKey, score) = ranking(idx)
            } yield {
              val realId = policy.itemType match {
                case ItemType.BLOB => exactCounter(policy.version).getBlobValue(policy, exactKey.itemKey)
                  .getOrElse(throw new Exception(s"not found blob id. ${policy.service}.${policy.action} ${exactKey.itemKey}"))
                case _ => exactKey.itemKey
              }
              RankCounterItem(idx + 1, realId, score)
            }
          }

          val eq = ExactQualifier(tqFrom, dimension)
          RankCounterDimensionItem(eq.tq.q.toString, eq.tq.ts, eq.dimension, -1, rankCounterItems)
        }
      }
    }

    Future.sequence(futures).map { dimensionResultList =>
      RankCounterResult(RankCounterResultMeta(policy.service, policy.action), dimensionResultList)
    }
  }

  def getRankCounterResult(policy: Counter, dimKeys: Seq[(Map[String, String], Seq[RankingKey])], kValue: Int): RankCounterResult = {
    val dimensionResultList = {
      for {
        (dimension, keys) <- dimKeys
        key <- keys
      } yield {
        val rankingValue = rankingCounter(policy.version).getTopK(key, kValue)
        val ranks = {
          for {
            rValue <- rankingValue.toSeq
            idx <- rValue.values.indices
            rank = idx + 1
          } yield {
            val (id, score) = rValue.values(idx)
            val realId = policy.itemType match {
              case ItemType.BLOB =>
                exactCounter(policy.version)
                  .getBlobValue(policy, id)
                  .getOrElse(throw new Exception(s"not found blob id. ${policy.service}.${policy.action} $id"))
              case _ => id
            }
            RankCounterItem(rank, realId, score)
          }
        }
        val eq = key.eq
        val tq = eq.tq
        RankCounterDimensionItem(tq.q.toString, tq.ts, eq.dimension, rankingValue.map(v => v.totalScore).getOrElse(0d), ranks)
      }
    }

    RankCounterResult(RankCounterResultMeta(policy.service, policy.action), dimensionResultList)
  }

  type Record = ProducerRecord[String, String]

  def incrementCount(service: String, action: String, item: String) = Action.async(s2parse.json) { request =>
    Future {
      /*
       * {
       * timestamp: Long
       * property: {}
       * value: Int
       * }
       */
      lazy val metaMap = Map("service" -> service, "action" -> action, "item" -> item)
      counterModel.findByServiceAction(service, action).map { policy =>
        val body = request.body
        try {
          val ts = (body \ "timestamp").asOpt[Long].getOrElse(System.currentTimeMillis()).toString
          val dimension = (body \ "dimension").asOpt[JsValue].getOrElse(Json.obj())
          val property = (body \ "property").asOpt[JsValue].getOrElse(Json.obj())

          val msg = List(ts, service, action, item, dimension, property).mkString("\t")

          // produce to kafka
          // hash partitioner by key
          walLogHandler.enqueue(KafkaMessage(new Record(CounterConfig.KAFKA_TOPIC_COUNTER, s"$ts.$item", msg)))

          Ok(Json.toJson(
            Map(
              "meta" -> metaMap
            )
          ))
        }
        catch {
          case e: JsResultException =>
            BadRequest(Json.toJson(
              Map("msg" -> s"need timestamp.")
            ))
        }
      }.getOrElse {
        NotFound(Json.toJson(
          Map("msg" -> s"$service.$action not found")
        ))
      }
    }
  }
}
