blob: 1e13cfbb531e835938524e5717ae52a63b17dfa9 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.resourcemanager
import com.webank.wedatasphere.linkis.common.utils.{ByteTimeUtils, Logging}
import com.webank.wedatasphere.linkis.resourcemanager.ResourceRequestPolicy._
import com.webank.wedatasphere.linkis.resourcemanager.exception.RMWarnException
import com.webank.wedatasphere.linkis.resourcemanager.utils.RMByteTimeUtils
import org.apache.commons.lang.StringUtils
import org.json4s.JsonAST.JObject
import org.json4s.JsonDSL._
import org.json4s.jackson.Serialization
import org.json4s.{CustomSerializer, DefaultFormats, Extraction}
import scala.collection.JavaConversions
/**
* Created by shanhuang on 9/11/18.
*/
abstract class Resource {
def add(r: Resource): Resource
def minus(r: Resource): Resource
def multiplied(r: Resource): Resource
def multiplied(rate: Float): Resource
def divide(r: Resource): Resource
def divide(rate: Int): Resource
def moreThan(r: Resource): Boolean
/**
* Part is greater than(部分大于)
*
* @param r
* @return
*/
def caseMore(r: Resource): Boolean
def equalsTo(r: Resource): Boolean
def +(r: Resource) = add(r)
def -(r: Resource) = minus(r)
def *(rate: Float) = multiplied(rate)
def *(rate: Double) = multiplied(rate.toFloat)
def /(rate: Int) = divide(rate)
def ==(r: Resource) = equalsTo(r)
def >(r: Resource) = moreThan(r)
def >=(r: Resource) = equalsTo(r) || moreThan(r)
def <(r: Resource) = ! >=(r)
def <=(r: Resource) = ! >(r)
def toJson: String
}
object Resource extends Logging {
def initResource(policy: ResourceRequestPolicy): Resource = policy match {
case CPU => new CPUResource(0)
case Memory => new MemoryResource(0)
case Load => new LoadResource(0, 0)
case Instance => new InstanceResource(0)
case LoadInstance => new LoadInstanceResource(0, 0, 0)
case Yarn => new YarnResource(0, 0, 0)
case DriverAndYarn => new DriverAndYarnResource(new LoadInstanceResource(0, 0, 0), new YarnResource(0, 0, 0))
case Special => new SpecialResource(new java.util.HashMap[String, AnyVal]())
case Default => new LoadResource(0, 0)
case _ => throw new RMWarnException(111003, "not supported resource result policy ")
}
def getZeroResource(resource: Resource): Resource = resource match {
case m: MemoryResource => new MemoryResource(0)
case c: CPUResource => new CPUResource(0)
case i: InstanceResource => new InstanceResource(0)
case l: LoadResource => new LoadResource(0, 0)
case li: LoadInstanceResource => new LoadInstanceResource(0, 0, 0)
case yarn: YarnResource => new YarnResource(0, 0, 0)
case dy: DriverAndYarnResource =>
if (dy.yarnResource != null && dy.yarnResource.queueName != null) {
new DriverAndYarnResource(new LoadInstanceResource(0, 0, 0), new YarnResource(0, 0, 0, dy.yarnResource.queueName))
} else {
new DriverAndYarnResource(new LoadInstanceResource(0, 0, 0), new YarnResource(0, 0, 0))
}
case s: SpecialResource => new SpecialResource(new java.util.HashMap[String, AnyVal]())
case r: Resource => throw new RMWarnException(111003, "not supported resource type " + r.getClass)
}
/* implicit def toYarnResource(r: Resource): YarnResource = r match {
case t: YarnResource => t
case _ => new YarnResource(0, 0, 0, "default")
}
implicit def toLoadInstanceResource(r: Resource): LoadInstanceResource = r match {
case t: LoadInstanceResource => t
case _ => new LoadInstanceResource(0, 0, 0)
}*/
}
case class UserAvailableResource(moduleName: String, resource: Resource)
class MemoryResource(val memory: Long) extends Resource {
private implicit def toMemoryResource(r: Resource): MemoryResource = r match {
case t: MemoryResource => t
case _ => new MemoryResource(0)
}
override def add(r: Resource): Resource = new MemoryResource(memory + r.memory)
override def minus(r: Resource): Resource = new MemoryResource(memory - r.memory)
override def multiplied(r: Resource): Resource = new MemoryResource(memory * r.memory)
override def multiplied(rate: Float): Resource = new MemoryResource((memory * rate).toLong)
override def divide(r: Resource): Resource = new MemoryResource(memory / r.memory)
override def divide(rate: Int): Resource = new MemoryResource(memory / rate)
override def moreThan(r: Resource): Boolean = memory > r.memory
/**
* Part is greater than(部分大于)
*
* @param r
* @return
*/
override def caseMore(r: Resource): Boolean = moreThan(r)
override def equalsTo(r: Resource): Boolean = memory == r.memory
override def toJson: String = s""" "memory":"${ByteTimeUtils.bytesToString(memory)}" """
override def toString: String = toJson
}
class CPUResource(val cores: Int) extends Resource {
private implicit def toCPUResource(r: Resource): CPUResource = r match {
case t: CPUResource => t
case _ => new CPUResource(0)
}
protected def toResource(cores: Int): Resource = new CPUResource(cores)
override def add(r: Resource): Resource = toResource(cores + r.cores)
override def minus(r: Resource): Resource = toResource(cores - r.cores)
override def multiplied(r: Resource): Resource = toResource(cores * r.cores)
override def multiplied(rate: Float): Resource = toResource((cores * rate).toInt)
override def divide(r: Resource): Resource = toResource(cores / r.cores)
override def divide(rate: Int): Resource = toResource(cores / rate)
override def moreThan(r: Resource): Boolean = cores > r.cores
/**
* Part is greater than(部分大于)
*
* @param r
* @return
*/
override def caseMore(r: Resource): Boolean = moreThan(r)
override def equalsTo(r: Resource): Boolean = cores == r.cores
override def toJson: String = s""" "cpu":$cores """
override def toString: String = toJson
}
class LoadResource(val memory: Long, val cores: Int) extends Resource {
private implicit def toLoadResource(r: Resource): LoadResource = r match {
case t: LoadResource => t
case _ => new LoadResource(0, 0)
}
def add(r: Resource): LoadResource =
new LoadResource(memory + r.memory, cores + r.cores)
def minus(r: Resource): LoadResource =
new LoadResource(memory - r.memory, cores - r.cores)
def multiplied(r: Resource): LoadResource =
new LoadResource(memory * r.memory, cores * r.cores)
def multiplied(rate: Float): LoadResource =
new LoadResource((memory * rate).toLong, math.round(cores * rate))
def divide(r: Resource): LoadResource =
new LoadResource(memory / r.memory, cores / r.cores)
def divide(rate: Int): LoadResource =
new LoadResource(memory / rate, cores / rate)
def moreThan(r: Resource): Boolean =
memory > r.memory && cores > r.cores
def caseMore(r: Resource): Boolean =
memory > r.memory || cores > r.cores
def equalsTo(r: Resource): Boolean =
memory == r.memory && cores == r.cores
override def toJson: String = s"""{"memory":"${ByteTimeUtils.bytesToString(memory)}","cpu":$cores}"""
override def toString: String = toJson
}
class LoadInstanceResource(val memory: Long, val cores: Int, val instances: Int) extends Resource {
implicit def toLoadInstanceResource(r: Resource): LoadInstanceResource = r match {
case t: LoadInstanceResource => t
case _ => new LoadInstanceResource(0, 0, 0)
}
def add(r: Resource): LoadInstanceResource =
new LoadInstanceResource(memory + r.memory, cores + r.cores, instances + r.instances)
def minus(r: Resource): LoadInstanceResource =
new LoadInstanceResource(memory - r.memory, cores - r.cores, instances - r.instances)
def multiplied(r: Resource): LoadInstanceResource =
new LoadInstanceResource(memory * r.memory, cores * r.cores, instances * r.instances)
def multiplied(rate: Float): LoadInstanceResource =
new LoadInstanceResource((memory * rate).toLong, math.round(cores * rate), (instances * rate).toInt)
def divide(r: Resource): LoadInstanceResource =
new LoadInstanceResource(memory / r.memory, cores / r.cores, instances / r.instances)
def divide(rate: Int): LoadInstanceResource =
new LoadInstanceResource(memory / rate, cores / rate, instances * rate)
def moreThan(r: Resource): Boolean =
memory > r.memory && cores > r.cores && instances > r.instances
def caseMore(r: Resource): Boolean =
memory > r.memory || cores > r.cores || instances > r.instances
def equalsTo(r: Resource): Boolean =
memory == r.memory && cores == r.cores && instances == r.instances
override def toJson: String = s"""{"instance":$instances,"memory":"${ByteTimeUtils.bytesToString(memory)}","cpu":$cores}"""
override def toString: String = s"Number of instances(实例数):$instances,(RAM)内存:${ByteTimeUtils.bytesToString(memory)},cpu:$cores"
}
class InstanceResource(val instances: Int) extends CPUResource(instances) {
override protected def toResource(cores: Int): Resource = new InstanceResource(cores)
override def toJson: String = s"Instance:$instances"
override def toString: String = toJson
}
/**
* Queue resource information, no initial registration, only user resource usage limit
* 队列资源信息,无初始化注册,只有用户资源使用上限
*
* @param queueName
* @param queueMemory
* @param queueCores
* @param queueInstances
*/
class YarnResource(val queueMemory: Long, val queueCores: Int, val queueInstances: Int, val queueName: String = "default", val applicationId: String = "") extends Resource {
implicit def toYarnResource(r: Resource): YarnResource = r match {
case t: YarnResource => t
case _ => new YarnResource(0, 0, 0, "default")
}
def add(r: Resource): YarnResource = {
// if (!queueName.equals(r.queueName))
// throw new RMWarnException(111001,s"This queue queueName(${this.queueName}) is different: ${r.queueName} ")
new YarnResource(queueMemory + r.queueMemory, queueCores + r.queueCores, queueInstances + r.queueInstances, r.queueName)
}
def minus(r: Resource): YarnResource = {
// if (!queueName.equals(r.queueName))
// throw new RMWarnException(111001,s"This queue queueName(${this.queueName}) is different: ${r.queueName} ")
new YarnResource(queueMemory - r.queueMemory, queueCores - r.queueCores, queueInstances - r.queueInstances, r.queueName)
}
def multiplied(r: Resource): YarnResource = {
// if (!queueName.equals(r.queueName))
// throw new RMWarnException(111001,s"This queue queueName(${this.queueName}) is different: ${r.queueName} ")
new YarnResource(queueMemory * r.queueMemory, queueCores * r.queueCores, queueInstances * r.queueInstances, r.queueName)
}
def multiplied(rate: Float): YarnResource =
new YarnResource((queueMemory * rate).toInt, (queueCores * rate).toInt, (queueInstances * rate).toInt, queueName)
def divide(r: Resource): YarnResource = {
// if (!queueName.equals(r.queueName))
// throw new RMWarnException(111001,s"This queue queueName(${this.queueName}) is different: ${r.queueName} ")
new YarnResource(queueMemory / r.queueMemory, queueCores / r.queueCores, queueInstances / r.queueInstances, r.queueName)
}
def divide(rate: Int): YarnResource =
new YarnResource(queueMemory / rate, queueCores / rate, queueInstances / rate, queueName)
def moreThan(r: Resource): Boolean = {
// if (!queueName.equals(r.queueName))
// throw new RMWarnException(111001,s"This queue queueName(${this.queueName}) is different: ${r.queueName} ")
queueMemory > r.queueMemory && queueCores > r.queueCores && queueInstances >= r.queueInstances
}
def caseMore(r: Resource): Boolean = {
// if (!queueName.equals(r.queueName))
// throw new RMWarnException(111001,s"This queue queueName(${this.queueName}) is different: ${r.queueName} ")
queueMemory > r.queueMemory || queueCores > r.queueCores || queueInstances >= r.queueInstances
}
def equalsTo(r: Resource): Boolean =
queueName == r.queueName && queueMemory == r.queueMemory && queueCores == r.queueCores && queueInstances == r.queueInstances
override def toJson: String = s"""{"queueName":"$queueName","queueMemory":"${ByteTimeUtils.bytesToString(queueMemory)}", "queueCpu":$queueCores, "instance":$queueInstances}"""
override def toString: String = s"Queue name(队列名):$queueName,Queue memory(队列内存):${RMByteTimeUtils.bytesToString(queueMemory)},Queue core number(队列核数):$queueCores, Number of queue instances(队列实例数):${queueInstances}"
}
class DriverAndYarnResource(val loadInstanceResource: LoadInstanceResource, val yarnResource: YarnResource) extends Resource with Logging {
private implicit def DriverAndYarnResource(r: Resource): DriverAndYarnResource = r match {
case t: DriverAndYarnResource => t
case _ => new DriverAndYarnResource(new LoadInstanceResource(0, 0, 0), new YarnResource(0, 0, 0))
}
def isModuleOperate(r: Resource): Boolean = {
if (this.yarnResource != null && r.yarnResource != null &&
StringUtils.isNotEmpty(this.yarnResource.queueName) &&
StringUtils.isNotEmpty(r.yarnResource.queueName) && this.yarnResource.queueName.equals(r.yarnResource.queueName)) {
info(s"Not module operate this:$this other:$r")
false
}
else
true
}
def isModuleOperate: Boolean = {
if (this.yarnResource != null && StringUtils.isNotEmpty(this.yarnResource.queueName))
false
else
true
}
override def add(r: Resource): DriverAndYarnResource = {
if (isModuleOperate(r)) {
new DriverAndYarnResource(this.loadInstanceResource.add(r.loadInstanceResource), this.yarnResource)
} else {
new DriverAndYarnResource(this.loadInstanceResource.add(r.loadInstanceResource), this.yarnResource.add(r.yarnResource))
}
}
override def minus(r: Resource): Resource = {
if (isModuleOperate(r)) {
new DriverAndYarnResource(this.loadInstanceResource.minus(r.loadInstanceResource), this.yarnResource)
} else {
new DriverAndYarnResource(this.loadInstanceResource.minus(r.loadInstanceResource), this.yarnResource.minus(r.yarnResource))
}
}
override def multiplied(r: Resource): Resource = {
throw new RMWarnException(111002, "Unsupported operation: multiplied")
}
override def multiplied(rate: Float): Resource = {
if (isModuleOperate) {
new DriverAndYarnResource(this.loadInstanceResource.multiplied(rate), this.yarnResource)
} else {
new DriverAndYarnResource(this.loadInstanceResource.multiplied(rate), this.yarnResource.multiplied(rate))
}
}
override def divide(r: Resource): Resource = throw new RMWarnException(111002, "Unsupported operation: multiplied")
override def divide(rate: Int): Resource = if (isModuleOperate) {
new DriverAndYarnResource(this.loadInstanceResource.divide(rate), this.yarnResource)
} else {
new DriverAndYarnResource(this.loadInstanceResource.divide(rate), this.yarnResource.divide(rate))
}
override def moreThan(r: Resource): Boolean = {
if (isModuleOperate(r)) {
this.loadInstanceResource.moreThan(r.loadInstanceResource)
} else {
this.loadInstanceResource.moreThan(r.loadInstanceResource) && this.yarnResource.moreThan(r.yarnResource)
}
}
override def caseMore(r: Resource): Boolean = if (isModuleOperate(r)) {
this.loadInstanceResource.caseMore(r.loadInstanceResource)
} else {
this.loadInstanceResource.caseMore(r.loadInstanceResource) || this.yarnResource.caseMore(r.yarnResource)
}
override def equalsTo(r: Resource): Boolean = if (isModuleOperate(r)) {
this.loadInstanceResource.equalsTo(r.loadInstanceResource)
} else {
this.loadInstanceResource.equalsTo(r.loadInstanceResource) && this.yarnResource.equalsTo(r.yarnResource)
}
override def toJson: String = {
var load = "null"
var yarn = "null"
if (loadInstanceResource != null) load = loadInstanceResource.toJson
if (yarnResource != null) yarn = yarnResource.toJson
s"""{"driver":${load}, "yarn":${yarn}}"""
}
override def toString: String = s"Driver resources(Driver资源):$loadInstanceResource,Queue resource(队列资源):$yarnResource"
}
class SpecialResource(val resources: java.util.Map[String, AnyVal]) extends Resource {
def this(resources: Map[String, AnyVal]) = this(JavaConversions.mapAsJavaMap(resources))
private def specialResourceOperator(r: Resource, op: (AnyVal, AnyVal) => AnyVal): SpecialResource = r match {
case s: SpecialResource =>
val rs = s.resources
new SpecialResource(JavaConversions.mapAsScalaMap(resources).map {
case (k, v) =>
val v1 = rs.get(k)
k -> op(v, v1)
}.toMap)
case _ => new SpecialResource(Map.empty[String, AnyVal])
}
override def add(r: Resource): Resource = specialResourceOperator(r, (v1, v2) => v1 match {
case i: Int => i + v2.asInstanceOf[Int]
case d: Double => d + v2.asInstanceOf[Double]
case l: Long => l + v2.asInstanceOf[Long]
case f: Float => f + v2.asInstanceOf[Float]
case s: Short => s + v2.asInstanceOf[Short]
case _ => throw new RMWarnException(111003, "not supported resource type: " + v1.getClass)
})
override def minus(r: Resource): Resource = specialResourceOperator(r, (v1, v2) => v1 match {
case i: Int => i - v2.asInstanceOf[Int]
case d: Double => d - v2.asInstanceOf[Double]
case l: Long => l - v2.asInstanceOf[Long]
case f: Float => f - v2.asInstanceOf[Float]
case s: Short => s - v2.asInstanceOf[Short]
case _ => throw new RMWarnException(111003, "not supported resource type: " + v1.getClass)
})
override def multiplied(r: Resource): Resource = specialResourceOperator(r, (v1, v2) => v1 match {
case i: Int => i * v2.asInstanceOf[Int]
case d: Double => d * v2.asInstanceOf[Double]
case l: Long => l * v2.asInstanceOf[Long]
case f: Float => f * v2.asInstanceOf[Float]
case s: Short => s * v2.asInstanceOf[Short]
case _ => throw new RMWarnException(111003, "not supported resource type: " + v1.getClass)
})
override def multiplied(rate: Float): Resource = new SpecialResource(JavaConversions.mapAsScalaMap(resources).map {
case (k, i: Int) => k -> (i * rate).toInt
case (k, d: Double) => k -> d * rate
case (k, l: Long) => k -> (l * rate).toLong
case (k, f: Float) => k -> f * rate
case (k, s: Short) => k -> (s * rate).toShort
case (k, v) => throw new RMWarnException(111003, "not supported resource type: " + v.getClass)
}.toMap[String, AnyVal])
override def divide(r: Resource): Resource = specialResourceOperator(r, (v1, v2) => v1 match {
case i: Int => i / v2.asInstanceOf[Int]
case d: Double => d / v2.asInstanceOf[Double]
case l: Long => l / v2.asInstanceOf[Long]
case f: Float => f / v2.asInstanceOf[Float]
case s: Short => s / v2.asInstanceOf[Short]
case _ => throw new RMWarnException(111003, "not supported resource type: " + v1.getClass)
})
override def divide(rate: Int): Resource = new SpecialResource(JavaConversions.mapAsScalaMap(resources).map {
case (k, i: Int) => k -> i / rate
case (k, d: Double) => k -> d / rate
case (k, l: Long) => k -> l / rate
case (k, f: Float) => k -> f / rate
case (k, s: Short) => k -> (s / rate).toShort
case (k, v) => throw new RMWarnException(111003, "not supported resource type: " + v.getClass)
}.toMap[String, AnyVal])
override def moreThan(r: Resource): Boolean = r match {
case s: SpecialResource =>
val rs = s.resources
!JavaConversions.mapAsScalaMap(resources).exists {
case (k, i: Int) => i <= rs.get(k).asInstanceOf[Int]
case (k, d: Double) => d <= rs.get(k).asInstanceOf[Double]
case (k, l: Long) => l <= rs.get(k).asInstanceOf[Long]
case (k, f: Float) => f <= rs.get(k).asInstanceOf[Float]
case (k, s: Short) => s <= rs.get(k).asInstanceOf[Short]
case (k, v) => throw new RMWarnException(111003, "not supported resource type: " + v.getClass)
}
case _ => true
}
/**
* Part is greater than(部分大于)
*
* @param r
* @return
*/
override def caseMore(r: Resource): Boolean = r match {
case s: SpecialResource =>
val rs = s.resources
JavaConversions.mapAsScalaMap(resources).exists {
case (k, i: Int) => i > rs.get(k).asInstanceOf[Int]
case (k, d: Double) => d > rs.get(k).asInstanceOf[Double]
case (k, l: Long) => l > rs.get(k).asInstanceOf[Long]
case (k, f: Float) => f > rs.get(k).asInstanceOf[Float]
case (k, s: Short) => s > rs.get(k).asInstanceOf[Short]
case (k, v) => throw new RMWarnException(111003, "not supported resource type: " + v.getClass)
}
case _ => true
}
override def equalsTo(r: Resource): Boolean = r match {
case s: SpecialResource =>
val rs = s.resources
!JavaConversions.mapAsScalaMap(resources).exists {
case (k, i: Int) => i != rs.get(k).asInstanceOf[Int]
case (k, d: Double) => d != rs.get(k).asInstanceOf[Double]
case (k, l: Long) => l != rs.get(k).asInstanceOf[Long]
case (k, f: Float) => f != rs.get(k).asInstanceOf[Float]
case (k, s: Short) => s != rs.get(k).asInstanceOf[Short]
case (k, v) => throw new RMWarnException(111003, "not supported resource type: " + v.getClass)
}
case _ => true
}
override def toJson: String = s"Special:$resources"
}
object ResourceSerializer extends CustomSerializer[Resource](implicit formats => ( {
case JObject(List(("memory", memory))) => new MemoryResource(memory.extract[Long])
case JObject(List(("cores", cores))) => new CPUResource(cores.extract[Int])
case JObject(List(("instance", instances))) => new InstanceResource(instances.extract[Int])
case JObject(List(("memory", memory), ("cores", cores))) => new LoadResource(memory.extract[Long], cores.extract[Int])
case JObject(List(("memory", memory), ("cores", cores), ("instance", instances))) =>
new LoadInstanceResource(memory.extract[Long], cores.extract[Int], instances.extract[Int])
case JObject(List(("applicationId", applicationId), ("queueName", queueName), ("queueMemory", queueMemory), ("queueCores", queueCores), ("queueInstances", queueInstances))) =>
new YarnResource(queueMemory.extract[Long], queueCores.extract[Int], queueInstances.extract[Int], queueName.extract[String], applicationId.extract[String])
case JObject(List(("DriverAndYarnResource", JObject(List(("loadInstanceResource", loadInstanceResource), ("yarnResource", yarnResource)))))) =>
implicit val formats = DefaultFormats
new DriverAndYarnResource(loadInstanceResource.extract[LoadInstanceResource],
yarnResource.extract[YarnResource])
case JObject(List(("resources", resources))) =>
new SpecialResource(resources.extract[Map[String, AnyVal]])
case JObject(list) => throw new RMWarnException(111003, "not supported resource serializable string " + list)
}, {
case m: MemoryResource => ("memory", m.memory)
case c: CPUResource => ("cores", c.cores)
case i: InstanceResource => ("instance", i.instances)
case l: LoadResource => ("memory", l.memory) ~ ("cores", l.cores)
case li: LoadInstanceResource => ("memory", li.memory) ~ ("cores", li.cores) ~ ("instance", li.instances)
case yarn: YarnResource => ("applicationId", yarn.applicationId) ~ ("queueName", yarn.queueName) ~ ("queueMemory", yarn.queueMemory) ~ ("queueCores", yarn.queueCores) ~ ("queueInstances", yarn.queueInstances)
case dy: DriverAndYarnResource =>
implicit val formats = DefaultFormats
("DriverAndYarnResource", new JObject(List(("loadInstanceResource", Extraction.decompose(dy.loadInstanceResource)), ("yarnResource", Extraction.decompose(dy.yarnResource)))))
case s: SpecialResource => ("resources", Serialization.write(JavaConversions.mapAsScalaMap(s.resources).toMap))
case r: Resource => throw new RMWarnException(111003, "not supported resource type " + r.getClass)
}
)
)