blob: ddad48ff1781fda60ef27c1b8a91279b84e6a471 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.resourcemanager.external.yarn
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.manager.common.entity.resource.{CommonNodeResource, NodeResource, ResourceType, YarnResource}
import com.webank.wedatasphere.linkis.resourcemanager.exception.{RMErrorException, RMWarnException}
import com.webank.wedatasphere.linkis.resourcemanager.external.domain.{ExternalAppInfo, ExternalResourceIdentifier, ExternalResourceProvider}
import com.webank.wedatasphere.linkis.resourcemanager.external.request.ExternalResourceRequester
import org.apache.http.HttpHeaders
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.json4s.JValue
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods.parse
import sun.misc.BASE64Encoder
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
class YarnResourceRequester extends ExternalResourceRequester with Logging {
private var provider:ExternalResourceProvider = _
private def getAuthorizationStr = {
val user = this.provider.getConfigMap.getOrDefault("user","").asInstanceOf[String]
val pwd = this.provider.getConfigMap.getOrDefault("pwd","").asInstanceOf[String]
val authKey = user + ":" + pwd
val base64Encoder = new BASE64Encoder()
base64Encoder.encode(authKey.getBytes)
}
override def requestResourceInfo(identifier: ExternalResourceIdentifier, provider: ExternalResourceProvider): NodeResource = {
val rmWebAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String]
info(s"rmWebAddress: $rmWebAddress")
val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName
this.provider = provider
def getHadoopVersion() = if(provider.getConfigMap.get("hadoopVersion") != null) provider.getConfigMap.get("hadoopVersion").asInstanceOf[String] else {
val resourceManagerVersion = getResponseByUrl("info", rmWebAddress) \ "clusterInfo" \ "resourceManagerVersion"
info(s"Hadoop version is $resourceManagerVersion")
resourceManagerVersion.values.asInstanceOf[String]
}
def getYarnResource(jValue: Option[JValue]) = jValue.map(r => new YarnResource((r \ "memory").asInstanceOf[JInt].values.toLong * 1024l * 1024l, (r \ "vCores").asInstanceOf[JInt].values.toInt, 0, queueName))
def maxEffectiveHandle(queueValue: Option[JValue]): Option[YarnResource] = {
val metrics = getResponseByUrl( "metrics", rmWebAddress)
val totalResouceInfoResponse = ((metrics \ "clusterMetrics" \ "totalMB").asInstanceOf[JInt].values.toLong, (metrics \ "clusterMetrics" \ "totalVirtualCores").asInstanceOf[JInt].values.toLong)
queueValue.map(r => {
val effectiveResource = (r \ "absoluteCapacity").asInstanceOf[JDouble].values - (r \ "absoluteUsedCapacity").asInstanceOf[JDouble].values
new YarnResource(math.floor(effectiveResource * totalResouceInfoResponse._1 * 1024l * 1024l/100).toLong, math.floor(effectiveResource * totalResouceInfoResponse._2/100).toInt, 0, queueName)
})
}
var realQueueName = "root." + queueName
def getQueue(queues: JValue): Option[JValue] = queues match {
case JArray(queue) =>
queue.foreach { q =>
val yarnQueueName = (q \ "queueName").asInstanceOf[JString].values
if (yarnQueueName == realQueueName) return Some(q)
else if (realQueueName.startsWith(yarnQueueName + ".")) return getQueue(getChildQueues(q))
}
None
case JObject(queue) =>
if (queue.find(_._1 == "queueName").exists(_._2.asInstanceOf[JString].values == realQueueName)) Some(queues)
else {
val childQueues = queue.find(_._1 == "childQueues")
if (childQueues.isEmpty) None
else getQueue(childQueues.map(_._2).get)
}
case JNull | JNothing => None
}
def getChildQueues(resp:JValue):JValue = {
val queues = resp \ "childQueues" \ "queue"
if(queues != null && queues != JNull && queues != JNothing && null != queues.children && queues.children.nonEmpty) {
info(s"queues:$queues")
queues
} else resp \ "childQueues"
}
def getQueueOfCapacity(queues: JValue): Option[JValue] = queues match {
case JArray(queue) =>
queue.foreach { q =>
val yarnQueueName = (q \ "queueName").asInstanceOf[JString].values
if (yarnQueueName == realQueueName) return Some(q)
else if ((q \ "queues").toOption.nonEmpty) {
val matchQueue = getQueueOfCapacity(getChildQueuesOfCapacity(q))
if (matchQueue.nonEmpty) return matchQueue
}
}
None
case JObject(queue) =>
if (queue.find(_._1 == "queueName").exists(_._2.asInstanceOf[JString].values == realQueueName)) return Some(queues)
else if ((queues \ "queues").toOption.nonEmpty) {
val matchQueue = getQueueOfCapacity(getChildQueuesOfCapacity(queues))
if (matchQueue.nonEmpty) return matchQueue
}
None
case JNull | JNothing => None
}
def getChildQueuesOfCapacity(resp: JValue) = resp \ "queues" \ "queue"
def getResources() = {
val resp = getResponseByUrl("scheduler", rmWebAddress)
val schedulerType = (resp \ "scheduler" \ "schedulerInfo" \ "type").asInstanceOf[JString].values
if ("capacityScheduler".equals(schedulerType)) {
realQueueName = queueName
val childQueues = getChildQueuesOfCapacity(resp \ "scheduler" \ "schedulerInfo")
val queue = getQueueOfCapacity(childQueues)
if (queue.isEmpty) {
debug(s"cannot find any information about queue $queueName, response: " + resp)
throw new RMWarnException(11006, s"queue $queueName is not exists in YARN.")
}
(maxEffectiveHandle(queue).get, getYarnResource(queue.map(_ \ "resourcesUsed")).get)
} else if ("fairScheduler".equals(schedulerType)) {
val childQueues = getChildQueues(resp \ "scheduler" \ "schedulerInfo" \ "rootQueue")
val queue = getQueue(childQueues)
if (queue.isEmpty) {
debug(s"cannot find any information about queue $queueName, response: " + resp)
throw new RMWarnException(11006, s"queue $queueName is not exists in YARN.")
}
(getYarnResource(queue.map(_ \ "maxResources")).get,
getYarnResource(queue.map(_ \ "usedResources")).get)
} else {
debug(s"only support fairScheduler or capacityScheduler, schedulerType: $schedulerType , response: " + resp)
throw new RMWarnException(11006, s"only support fairScheduler or capacityScheduler, schedulerType: $schedulerType")
}
}
Utils.tryCatch{
val yarnResource = getResources()
val nodeResource = new CommonNodeResource
nodeResource.setMaxResource(yarnResource._1)
nodeResource.setUsedResource(yarnResource._2)
nodeResource
}(t => {
throw new RMErrorException(11006, "Get the Yarn queue information exception" + ".(获取Yarn队列信息异常)", t)
})
}
override def requestAppInfo(identifier: ExternalResourceIdentifier, provider: ExternalResourceProvider): java.util.List[ExternalAppInfo] = {
val rmWebAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String]
val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName
def getYarnResource(jValue: Option[JValue]) = jValue.map(r => new YarnResource((r \ "allocatedMB").asInstanceOf[JInt].values.toLong * 1024l * 1024l, (r \ "allocatedVCores").asInstanceOf[JInt].values.toInt, 0, queueName))
val realQueueName = "root." + queueName
def getAppInfos() = {
val resp = getResponseByUrl("apps", rmWebAddress)
resp \ "apps" \ "app" match {
case JArray(apps) =>
val appInfoBuffer = new ArrayBuffer[YarnAppInfo]()
apps.foreach { app =>
val yarnQueueName = (app \ "queue").asInstanceOf[JString].values
val state = (app \ "state").asInstanceOf[JString].values
if (yarnQueueName == realQueueName && (state == "RUNNING" || state == "ACCEPTED")) {
val appInfo = new YarnAppInfo(
(app \ "id").asInstanceOf[JString].values,
(app \ "user").asInstanceOf[JString].values,
state,
(app \ "applicationType").asInstanceOf[JString].values,
getYarnResource(Some(app)).get
)
appInfoBuffer.append(appInfo)
}
}
appInfoBuffer.toArray
case JNull | JNothing => new Array[YarnAppInfo](0)
}
}
Utils.tryCatch(getAppInfos().toList)(t => {
throw new RMErrorException(11006, "Get the Yarn Application information exception.(获取Yarn Application信息异常)", t)
})
}
override def getResourceType = ResourceType.Yarn
private def getResponseByUrl(url: String, rmWebAddress: String) = {
val httpGet = new HttpGet(rmWebAddress + "/ws/v1/cluster/" + url)
httpGet.addHeader("Accept", "application/json")
if (this.provider.getConfigMap.get("authorEnable").asInstanceOf[Boolean])
httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + getAuthorizationStr)
val response = YarnResourceRequester.httpClient.execute(httpGet)
parse(EntityUtils.toString(response.getEntity()))
}
}
object YarnResourceRequester extends Logging {
private val httpClient = HttpClients.createDefault()
def init() = {
addShutdownHook()
}
def addShutdownHook(): Unit = {
info("Register shutdown hook to release httpclient connection")
Utils.addShutdownHook(httpClient.close())
}
}