/*
 * Copyright 2019 WeBank
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.webank.wedatasphere.linkis.resourcemanager.external.yarn

import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.manager.common.entity.resource.{CommonNodeResource, NodeResource, ResourceType, YarnResource}
import com.webank.wedatasphere.linkis.resourcemanager.exception.{RMErrorException, RMWarnException}
import com.webank.wedatasphere.linkis.resourcemanager.external.domain.{ExternalAppInfo, ExternalResourceIdentifier, ExternalResourceProvider}
import com.webank.wedatasphere.linkis.resourcemanager.external.request.ExternalResourceRequester
import org.apache.http.HttpHeaders
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.json4s.JValue
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods.parse
import sun.misc.BASE64Encoder

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer

class YarnResourceRequester extends ExternalResourceRequester with Logging {

  private var provider:ExternalResourceProvider = _

  private def getAuthorizationStr = {
    val user = this.provider.getConfigMap.getOrDefault("user","").asInstanceOf[String]
    val pwd = this.provider.getConfigMap.getOrDefault("pwd","").asInstanceOf[String]
    val authKey = user + ":" + pwd
    val base64Encoder = new BASE64Encoder()
    base64Encoder.encode(authKey.getBytes)
  }

  override def requestResourceInfo(identifier: ExternalResourceIdentifier, provider: ExternalResourceProvider): NodeResource = {
    val rmWebAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String]
    info(s"rmWebAddress: $rmWebAddress")
    val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName
    this.provider = provider
    def getHadoopVersion() = if(provider.getConfigMap.get("hadoopVersion") != null) provider.getConfigMap.get("hadoopVersion").asInstanceOf[String] else {
      val resourceManagerVersion = getResponseByUrl("info", rmWebAddress) \ "clusterInfo" \ "resourceManagerVersion"
      info(s"Hadoop version is $resourceManagerVersion")
      resourceManagerVersion.values.asInstanceOf[String]
    }

    def getYarnResource(jValue: Option[JValue]) = jValue.map(r => new YarnResource((r \ "memory").asInstanceOf[JInt].values.toLong * 1024l * 1024l, (r \ "vCores").asInstanceOf[JInt].values.toInt, 0, queueName))

    def maxEffectiveHandle(queueValue: Option[JValue]): Option[YarnResource] = {
      val metrics = getResponseByUrl( "metrics", rmWebAddress)
      val totalResouceInfoResponse = ((metrics \ "clusterMetrics" \ "totalMB").asInstanceOf[JInt].values.toLong, (metrics \ "clusterMetrics" \ "totalVirtualCores").asInstanceOf[JInt].values.toLong)
      queueValue.map(r => {
        val effectiveResource = (r \ "absoluteCapacity").asInstanceOf[JDouble].values - (r \ "absoluteUsedCapacity").asInstanceOf[JDouble].values
        new YarnResource(math.floor(effectiveResource * totalResouceInfoResponse._1 * 1024l * 1024l/100).toLong, math.floor(effectiveResource * totalResouceInfoResponse._2/100).toInt, 0, queueName)
      })
    }

    var realQueueName = "root." + queueName

    def getQueue(queues: JValue): Option[JValue] = queues match {
      case JArray(queue) =>
        queue.foreach { q =>
          val yarnQueueName = (q \ "queueName").asInstanceOf[JString].values
          if (yarnQueueName == realQueueName) return Some(q)
          else if (realQueueName.startsWith(yarnQueueName + ".")) return getQueue(getChildQueues(q))
        }
        None
      case JObject(queue) =>
        if (queue.find(_._1 == "queueName").exists(_._2.asInstanceOf[JString].values == realQueueName)) Some(queues)
        else {
          val childQueues = queue.find(_._1 == "childQueues")
          if (childQueues.isEmpty) None
          else getQueue(childQueues.map(_._2).get)
        }
      case JNull | JNothing => None
    }

    def getChildQueues(resp:JValue):JValue =  {
      val queues = resp \ "childQueues" \ "queue"

      if(queues != null && queues != JNull && queues != JNothing && null != queues.children && queues.children.nonEmpty) {
        info(s"queues:$queues")
        queues
      } else resp  \ "childQueues"
    }

    def getQueueOfCapacity(queues: JValue): Option[JValue] = queues match {
      case JArray(queue) =>
        queue.foreach { q =>
          val yarnQueueName = (q \ "queueName").asInstanceOf[JString].values
          if (yarnQueueName == realQueueName) return Some(q)
          else if ((q \ "queues").toOption.nonEmpty) {
            val matchQueue = getQueueOfCapacity(getChildQueuesOfCapacity(q))
            if (matchQueue.nonEmpty) return matchQueue
          }
        }
        None
      case JObject(queue) =>
        if (queue.find(_._1 == "queueName").exists(_._2.asInstanceOf[JString].values == realQueueName)) return Some(queues)
        else if ((queues \ "queues").toOption.nonEmpty) {
          val matchQueue = getQueueOfCapacity(getChildQueuesOfCapacity(queues))
          if (matchQueue.nonEmpty) return matchQueue
        }
        None
      case JNull | JNothing => None
    }

    def getChildQueuesOfCapacity(resp: JValue) = resp \ "queues" \ "queue"

    def getResources() = {
      val resp = getResponseByUrl("scheduler", rmWebAddress)
      val schedulerType = (resp \ "scheduler" \ "schedulerInfo" \ "type").asInstanceOf[JString].values
      if ("capacityScheduler".equals(schedulerType)) {
        realQueueName = queueName
        val childQueues = getChildQueuesOfCapacity(resp \ "scheduler" \ "schedulerInfo")
        val queue = getQueueOfCapacity(childQueues)
        if (queue.isEmpty) {
          debug(s"cannot find any information about queue $queueName, response: " + resp)
          throw new RMWarnException(11006, s"queue $queueName is not exists in YARN.")
        }
        (maxEffectiveHandle(queue).get, getYarnResource(queue.map(_ \ "resourcesUsed")).get)
      } else if ("fairScheduler".equals(schedulerType)) {
        val childQueues = getChildQueues(resp \ "scheduler" \ "schedulerInfo" \ "rootQueue")
        val queue = getQueue(childQueues)
        if (queue.isEmpty) {
          debug(s"cannot find any information about queue $queueName, response: " + resp)
          throw new RMWarnException(11006, s"queue $queueName is not exists in YARN.")
        }
        (getYarnResource(queue.map(_ \ "maxResources")).get,
          getYarnResource(queue.map(_ \ "usedResources")).get)
      } else {
        debug(s"only support fairScheduler or capacityScheduler, schedulerType: $schedulerType , response: " + resp)
        throw new RMWarnException(11006, s"only support fairScheduler or capacityScheduler, schedulerType: $schedulerType")
      }
    }

    Utils.tryCatch{
      val yarnResource = getResources()
      val nodeResource = new CommonNodeResource
      nodeResource.setMaxResource(yarnResource._1)
      nodeResource.setUsedResource(yarnResource._2)
      nodeResource
    }(t => {
      throw new RMErrorException(11006, "Get the Yarn queue information exception" + ".(获取Yarn队列信息异常)", t)
    })
  }

  override def requestAppInfo(identifier: ExternalResourceIdentifier, provider: ExternalResourceProvider): java.util.List[ExternalAppInfo] = {
    val rmWebAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String]
    val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName

    def getYarnResource(jValue: Option[JValue]) = jValue.map(r => new YarnResource((r \ "allocatedMB").asInstanceOf[JInt].values.toLong * 1024l * 1024l, (r \ "allocatedVCores").asInstanceOf[JInt].values.toInt, 0, queueName))

    val realQueueName = "root." + queueName

    def getAppInfos() = {
      val resp = getResponseByUrl("apps", rmWebAddress)
      resp \ "apps" \ "app" match {
        case JArray(apps) =>
          val appInfoBuffer = new ArrayBuffer[YarnAppInfo]()
          apps.foreach { app =>
            val yarnQueueName = (app \ "queue").asInstanceOf[JString].values
            val state = (app \ "state").asInstanceOf[JString].values
            if (yarnQueueName == realQueueName && (state == "RUNNING" || state == "ACCEPTED")) {
              val appInfo = new YarnAppInfo(
                (app \ "id").asInstanceOf[JString].values,
                (app \ "user").asInstanceOf[JString].values,
                state,
                (app \ "applicationType").asInstanceOf[JString].values,
                getYarnResource(Some(app)).get
              )
              appInfoBuffer.append(appInfo)
            }
          }
          appInfoBuffer.toArray
        case JNull | JNothing => new Array[YarnAppInfo](0)
      }
    }

    Utils.tryCatch(getAppInfos().toList)(t => {
      throw new RMErrorException(11006, "Get the Yarn Application information exception.(获取Yarn Application信息异常)", t)
    })
  }

  override def getResourceType = ResourceType.Yarn

  private def getResponseByUrl(url: String, rmWebAddress: String) = {
    val httpGet = new HttpGet(rmWebAddress + "/ws/v1/cluster/" + url)
    httpGet.addHeader("Accept", "application/json")
    if (this.provider.getConfigMap.get("authorEnable").asInstanceOf[Boolean])
      httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + getAuthorizationStr)
    val response = YarnResourceRequester.httpClient.execute(httpGet)
    parse(EntityUtils.toString(response.getEntity()))
  }
}
object YarnResourceRequester extends Logging {

  private val httpClient = HttpClients.createDefault()
  def init() = {
    addShutdownHook()
  }

  def addShutdownHook(): Unit = {
    info("Register shutdown hook to release httpclient connection")
    Utils.addShutdownHook(httpClient.close())
  }
}
