/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.s2graph.counter.core.v2

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.typesafe.config.Config
import org.apache.http.HttpStatus
import org.apache.s2graph.core.mysqls.Label
import org.apache.s2graph.counter.config.S2CounterConfig
import org.apache.s2graph.counter.core
import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap
import org.apache.s2graph.counter.core._
import org.apache.s2graph.counter.models.Counter
import org.apache.s2graph.counter.util.CartesianProduct
import org.asynchttpclient.DefaultAsyncHttpClientConfig
import org.slf4j.LoggerFactory
import play.api.libs.json._
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

object ExactStorageGraph {
  case class RespGraph(success: Boolean, result: Long)
  implicit val respGraphFormat = Json.format[RespGraph]

  // using play-ws without play app
  implicit val materializer = ActorMaterializer.create(ActorSystem(getClass.getSimpleName))
  private val builder = new DefaultAsyncHttpClientConfig.Builder()
  private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build)
}

case class ExactStorageGraph(config: Config) extends ExactStorage {
  private val log = LoggerFactory.getLogger(this.getClass)
  private val s2config = new S2CounterConfig(config)

  private val SERVICE_NAME = "s2counter"
  private val COLUMN_NAME = "bucket"
  private val labelPostfix = "_counts"

  val s2graphUrl = s2config.GRAPH_URL
  val s2graphReadOnlyUrl = s2config.GRAPH_READONLY_URL
  val graphOp = new GraphOperation(config)

  import ExactStorageGraph._

  override def update(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = {
    import scala.concurrent.ExecutionContext.Implicits.global

    val (keyWithEq, reqJsLs) = toIncrementCountRequests(policy, counts).unzip(x => ((x._1, x._2), x._3))

    val future = wsClient.url(s"$s2graphUrl/graphs/edges/incrementCount").post(Json.toJson(reqJsLs)).map { resp =>
      resp.status match {
        case HttpStatus.SC_OK =>
          val respSeq = resp.json.as[Seq[RespGraph]]

          val keyWithEqResult = {
            for {
              ((key, eq), RespGraph(success, result)) <- keyWithEq.zip(respSeq)
            } yield {
              (key, (eq, result))
            }
          }.groupBy(_._1).mapValues{ seq => seq.map(_._2).toMap }
          keyWithEqResult
        case _ =>
          throw new RuntimeException(s"update failed: $policy $counts")
      }
    }
    Await.result(future, 10 second)
  }

  def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = {

  }

  private def toIncrementCountRequests(policy: Counter,
                                       counts: Seq[(ExactKeyTrait, ExactValueMap)])
  : Seq[(ExactKeyTrait, core.ExactQualifier, JsValue)] = {
    val labelName = policy.action + labelPostfix
    val timestamp = System.currentTimeMillis()
    for {
      (exactKey, values) <- counts
      (eq, value) <- values
    } yield {
      val from = exactKey.itemKey
      val to = eq.dimension
      val json = Json.obj(
        "timestamp" -> timestamp,
        "operation" -> "incrementCount",
        "from" -> from,
        "to" -> to,
        "label" -> labelName,
        "props" -> Json.obj(
          "_count" -> value,
          "time_unit" -> eq.tq.q.toString,
          "time_value" -> eq.tq.ts
        )
      )
      (exactKey, eq, json)
    }
  }

  override def get(policy: Counter,
                   items: Seq[String],
                   timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)],
                   dimQuery: Map[String, Set[String]])
                  (implicit ex: ExecutionContext): Future[Seq[FetchedCountsGrouped]] = {
    val labelName = policy.action + labelPostfix
    val label = Label.findByName(labelName).get
//    val label = labelModel.findByName(labelName).get

    val ids = Json.toJson(items)

    val dimensions = {
      for {
        values <- CartesianProduct(dimQuery.values.map(ss => ss.toList).toList)
      } yield {
        dimQuery.keys.zip(values).toMap
      }
    }

    val stepJsLs = {
      for {
        (tqFrom, tqTo) <- timeRange
        dimension <- dimensions
      } yield {
        val eqFrom = core.ExactQualifier(tqFrom, dimension)
        val eqTo = core.ExactQualifier(tqTo, dimension)
        val intervalJs =
          s"""
            |{
            |  "from": {
            |    "_to": "${eqFrom.dimension}",
            |    "time_unit": "${eqFrom.tq.q}",
            |    "time_value": ${eqFrom.tq.ts}
            |  },
            |  "to": {
            |    "_to": "${eqTo.dimension}",
            |    "time_unit": "${eqTo.tq.q}",
            |    "time_value": ${eqTo.tq.ts + 1}
            |  }
            |}
          """.stripMargin
        val stepJs =
          s"""
            |{
            |  "direction": "out",
            |  "limit": -1,
            |  "duplicate": "raw",
            |  "label": "$labelName",
            |  "interval": $intervalJs
            |}
           """.stripMargin
        stepJs
      }
    }

    val reqJsStr =
      s"""
        |{
        |  "srcVertices": [
        |    {"serviceName": "${policy.service}", "columnName": "${label.srcColumnName}", "ids": $ids}
        |  ],
        |  "steps": [
        |    {
        |      "step": [
        |        ${stepJsLs.mkString(",")}
        |      ]
        |    }
        |  ]
        |}
      """.stripMargin

    val reqJs = Json.parse(reqJsStr)
//    log.warn(s"query: ${reqJs.toString()}")

    wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(reqJs).map { resp =>
      resp.status match {
        case HttpStatus.SC_OK =>
          val respJs = resp.json
//          println(respJs)
          val keyWithValues = (respJs \ "results").as[Seq[JsValue]].map { result =>
//            println(s"result: $result")
            resultToExactKeyValues(policy, result)
          }.groupBy(_._1).mapValues(seq => seq.map(_._2).toMap.groupBy { case (eq, v) => (eq.tq.q, eq.dimKeyValues) })
          for {
            (k, v) <- keyWithValues.toSeq
          } yield {
            FetchedCountsGrouped(k, v)
          }
        case n: Int =>
          log.warn(s"getEdges status($n): $reqJsStr")
//          println(s"getEdges status($n): $reqJsStr")
          Nil
      }
    }
  }

  private def resultToExactKeyValues(policy: Counter, result: JsValue): (ExactKeyTrait, (core.ExactQualifier, Long)) = {
    val from = (result \ "from").get match {
      case s: JsString => s.as[String]
      case n: JsNumber => n.as[Long].toString
      case x => throw new RuntimeException(s"$x's type must be string or number")
    }
    val dimension = (result \ "to").as[String]
    val props = result \ "props"
    val count = (props \ "_count").as[Long]
    val timeUnit = (props \ "time_unit").as[String]
    val timeValue = (props \ "time_value").as[Long]
    (ExactKey(policy, from, checkItemType = true), (core.ExactQualifier(core.TimedQualifier(timeUnit, timeValue), dimension), count))
  }

  private def getInner(policy: Counter, key: ExactKeyTrait, eqs: Seq[core.ExactQualifier])
                      (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] = {
    val labelName = policy.action + labelPostfix

    Label.findByName(labelName) match {
      case Some(label) =>

        val src = Json.obj("serviceName" -> policy.service, "columnName" -> label.srcColumnName, "id" -> key.itemKey)
        val step = {
          val stepLs = {
            for {
              eq <- eqs
            } yield {
              val from = Json.obj("_to" -> eq.dimension, "time_unit" -> eq.tq.q.toString, "time_value" -> eq.tq.ts)
              val to = Json.obj("_to" -> eq.dimension, "time_unit" -> eq.tq.q.toString, "time_value" -> eq.tq.ts)
              val interval = Json.obj("from" -> from, "to" -> to)
              Json.obj("limit" -> 1, "label" -> labelName, "interval" -> interval)
            }
          }
          Json.obj("step" -> stepLs)
        }
        val query = Json.obj("srcVertices" -> Json.arr(src), "steps" -> Json.arr(step))
        //    println(s"query: ${query.toString()}")

        wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(query).map { resp =>
          resp.status match {
            case HttpStatus.SC_OK =>
              val respJs = resp.json
              val keyWithValues = (respJs \ "results").as[Seq[JsValue]].map { result =>
                resultToExactKeyValues(policy, result)
              }.groupBy(_._1).mapValues(seq => seq.map(_._2).toMap)
              for {
                (key, eqWithValues) <- keyWithValues.toSeq
              } yield {
                FetchedCounts(key, eqWithValues)
              }
            case _ =>
              Nil
          }
        }
      case None =>
        Future.successful(Nil)
    }
  }

  // for query exact qualifier
  override def get(policy: Counter, queries: Seq[(ExactKeyTrait, Seq[core.ExactQualifier])])
                  (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] = {
    val futures = {
      for {
        (key, eqs) <- queries
      } yield {
//        println(s"$key $eqs")
        getInner(policy, key, eqs)
      }
    }
    Future.sequence(futures).map(_.flatten)
  }

  override def getBlobValue(policy: Counter, blobId: String): Option[String] = {
    throw new RuntimeException("unsupported getBlobValue operation")
  }

  override def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean] = {
    throw new RuntimeException("unsupported insertBlobValue operation")
  }

  private def existsLabel(policy: Counter, useCache: Boolean = true): Boolean = {
    val action = policy.action
    val counterLabelName = action + labelPostfix

    Label.findByName(counterLabelName, useCache).nonEmpty
  }

  override def prepare(policy: Counter): Unit = {
    val service = policy.service
    val action = policy.action

    val graphLabel = Label.findByName(action)
    if (graphLabel.isEmpty) {
      throw new Exception(s"label not found. $service.$action")
    }

    if (!existsLabel(policy, useCache = false)) {
      val label = Label.findByName(action, useCache = false).get

      val counterLabelName = action + labelPostfix
      val defaultJson =
        s"""
           |{
           |  "label": "$counterLabelName",
           |  "srcServiceName": "$service",
           |  "srcColumnName": "${label.tgtColumnName}",
           |  "srcColumnType": "${label.tgtColumnType}",
           |  "tgtServiceName": "$SERVICE_NAME",
           |  "tgtColumnName": "$COLUMN_NAME",
           |  "tgtColumnType": "string",
           |  "indices": [
           |    {"name": "time", "propNames": ["_to", "time_unit", "time_value"]}
           |  ],
           |  "props": [
           |    {"name": "time_unit", "dataType": "string", "defaultValue": ""},
           |    {"name": "time_value", "dataType": "long", "defaultValue": 0}
           |  ],
           |  "hTableName": "${policy.hbaseTable.get}"
           |}
        """.stripMargin
      val json = policy.dailyTtl.map(ttl => ttl * 24 * 60 * 60) match {
        case Some(ttl) =>
          Json.parse(defaultJson).as[JsObject] + ("hTableTTL" -> Json.toJson(ttl))
        case None =>
          Json.parse(defaultJson)
      }

      graphOp.createLabel(json)
    }
  }

  override def destroy(policy: Counter): Unit = {
    val action = policy.action

    if (existsLabel(policy, useCache = false)) {
      val counterLabelName = action + labelPostfix

      graphOp.deleteLabel(counterLabelName)
    }
  }

  override def ready(policy: Counter): Boolean = {
    existsLabel(policy)
  }

  // for range query
  override def get(policy: Counter, items: Seq[String], timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)])
                  (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] = {
    throw new NotImplementedError("Not implemented")
  }
}
