blob: d1244a9f5792a6dcda0c9939e66fd2c0f1c0abbd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.linkis.manager.common.entity.resource
import org.apache.linkis.common.utils.{ByteTimeUtils, Logging}
import org.apache.linkis.manager.common.entity.resource.ResourceType._
import org.apache.linkis.manager.common.errorcode.ManagerCommonErrorCodeSummary._
import org.apache.linkis.manager.common.exception.ResourceWarnException
import org.apache.commons.lang3.StringUtils
import java.text.MessageFormat
import scala.collection.JavaConverters._
import org.json4s.{CustomSerializer, DefaultFormats, Extraction}
import org.json4s.JsonAST.JObject
import org.json4s.JsonDSL._
import org.json4s.jackson.Serialization
abstract class Resource {
def add(r: Resource): Resource
def minus(r: Resource): Resource
def multiplied(r: Resource): Resource
def multiplied(rate: Float): Resource
def divide(r: Resource): Resource
def divide(rate: Int): Resource
def moreThan(r: Resource): Boolean
/**
* Part is greater than(部分大于)
*
* @param r
* @return
*/
def caseMore(r: Resource): Boolean
def equalsTo(r: Resource): Boolean
def notLess(r: Resource): Boolean
def +(r: Resource): Resource = add(r)
def -(r: Resource): Resource = minus(r)
def *(rate: Float): Resource = multiplied(rate)
def *(rate: Double): Resource = multiplied(rate.toFloat)
def /(rate: Int): Resource = divide(rate)
def ==(r: Resource): Boolean = equalsTo(r)
def >(r: Resource): Boolean = moreThan(r)
def >=(r: Resource): Boolean = notLess(r)
def <(r: Resource): Boolean = ! >=(r)
def <=(r: Resource): Boolean = ! >(r)
def toJson: String
}
object Resource extends Logging {
def initResource(resourceType: ResourceType): Resource = resourceType match {
case CPU => new CPUResource(0)
case Memory => new MemoryResource(0)
case Load => new LoadResource(0, 0)
case Instance => new InstanceResource(0)
case LoadInstance => new LoadInstanceResource(0, 0, 0)
case Yarn => new YarnResource(0, 0, 0)
case DriverAndYarn =>
new DriverAndYarnResource(new LoadInstanceResource(0, 0, 0), new YarnResource(0, 0, 0))
case Special => new SpecialResource(new java.util.HashMap[String, AnyVal]())
case Default => new LoadResource(0, 0)
case _ =>
throw new ResourceWarnException(
NOT_RESOURCE_POLICY.getErrorCode,
NOT_RESOURCE_POLICY.getErrorDesc
)
}
def getZeroResource(resource: Resource): Resource = resource match {
case m: MemoryResource => new MemoryResource(0)
case i: InstanceResource => new InstanceResource(0)
case c: CPUResource => new CPUResource(0)
case l: LoadResource => new LoadResource(0, 0)
case li: LoadInstanceResource => new LoadInstanceResource(0, 0, 0)
case yarn: YarnResource => new YarnResource(0, 0, 0)
case dy: DriverAndYarnResource =>
if (dy.yarnResource != null && dy.yarnResource.queueName != null) {
new DriverAndYarnResource(
new LoadInstanceResource(0, 0, 0),
new YarnResource(0, 0, 0, dy.yarnResource.queueName)
)
} else {
new DriverAndYarnResource(new LoadInstanceResource(0, 0, 0), new YarnResource(0, 0, 0))
}
case s: SpecialResource => new SpecialResource(new java.util.HashMap[String, AnyVal]())
case r: Resource =>
throw new ResourceWarnException(
NOT_RESOURCE_TYPE.getErrorCode,
MessageFormat.format(NOT_RESOURCE_TYPE.getErrorDesc, r.getClass)
)
}
}
case class UserAvailableResource(moduleName: String, resource: Resource)
class MemoryResource(val memory: Long) extends Resource {
private implicit def toMemoryResource(r: Resource): MemoryResource = r match {
case t: MemoryResource => t
case _ => new MemoryResource(Long.MaxValue)
}
override def add(r: Resource): Resource = new MemoryResource(memory + r.memory)
override def minus(r: Resource): Resource = new MemoryResource(memory - r.memory)
override def multiplied(r: Resource): Resource = new MemoryResource(memory * r.memory)
override def multiplied(rate: Float): Resource = new MemoryResource((memory * rate).toLong)
override def divide(r: Resource): Resource = new MemoryResource(memory / r.memory)
override def divide(rate: Int): Resource = new MemoryResource(memory / rate)
override def moreThan(r: Resource): Boolean = memory > r.memory
override def notLess(r: Resource): Boolean = memory >= r.memory
/**
* Part is greater than(部分大于)
*
* @param r
* @return
*/
override def caseMore(r: Resource): Boolean = moreThan(r)
override def equalsTo(r: Resource): Boolean = memory == r.memory
override def toJson: String = s""" "memory":"${ByteTimeUtils.bytesToString(memory)}" """
override def toString: String = toJson
}
class CPUResource(val cores: Int) extends Resource {
private implicit def toCPUResource(r: Resource): CPUResource = r match {
case t: CPUResource => t
case _ => new CPUResource(Integer.MAX_VALUE)
}
protected def toResource(cores: Int): Resource = new CPUResource(cores)
override def add(r: Resource): Resource = toResource(cores + r.cores)
override def minus(r: Resource): Resource = toResource(cores - r.cores)
override def multiplied(r: Resource): Resource = toResource(cores * r.cores)
override def multiplied(rate: Float): Resource = toResource((cores * rate).toInt)
override def divide(r: Resource): Resource = toResource(cores / r.cores)
override def divide(rate: Int): Resource = toResource(cores / rate)
override def moreThan(r: Resource): Boolean = cores > r.cores
/**
* Part is greater than(部分大于)
*
* @param r
* @return
*/
override def caseMore(r: Resource): Boolean = moreThan(r)
override def notLess(r: Resource): Boolean = cores >= r.cores
override def equalsTo(r: Resource): Boolean = cores == r.cores
override def toJson: String = s""" "cpu":$cores """
override def toString: String = toJson
}
class LoadResource(val memory: Long, val cores: Int) extends Resource {
private implicit def toLoadResource(r: Resource): LoadResource = r match {
case t: LoadResource => t
case m: MemoryResource => new LoadResource(m.memory, 0)
case c: CPUResource => new LoadResource(0, c.cores)
case _ => new LoadResource(Long.MaxValue, Integer.MAX_VALUE)
}
def add(r: Resource): LoadResource =
new LoadResource(memory + r.memory, cores + r.cores)
def minus(r: Resource): LoadResource =
new LoadResource(memory - r.memory, cores - r.cores)
def multiplied(r: Resource): LoadResource =
new LoadResource(memory * r.memory, cores * r.cores)
def multiplied(rate: Float): LoadResource =
new LoadResource((memory * rate).toLong, math.round(cores * rate))
def divide(r: Resource): LoadResource =
new LoadResource(memory / r.memory, cores / r.cores)
def divide(rate: Int): LoadResource =
new LoadResource(memory / rate, cores / rate)
def moreThan(r: Resource): Boolean =
memory > r.memory && cores > r.cores
def caseMore(r: Resource): Boolean =
memory > r.memory || cores > r.cores
def equalsTo(r: Resource): Boolean =
memory == r.memory && cores == r.cores
override def notLess(r: Resource): Boolean =
memory >= r.memory && cores >= r.cores
override def toJson: String =
s"""{"memory":"${ByteTimeUtils.bytesToString(memory)}","cpu":$cores}"""
override def toString: String = toJson
}
class LoadInstanceResource(val memory: Long, val cores: Int, val instances: Int) extends Resource {
implicit def toLoadInstanceResource(r: Resource): LoadInstanceResource = r match {
case t: LoadInstanceResource => t
case l: LoadResource => new LoadInstanceResource(l.memory, l.cores, 0)
case m: MemoryResource => new LoadInstanceResource(m.memory, 0, 0)
case c: CPUResource => new LoadInstanceResource(0, c.cores, 0)
case d: DriverAndYarnResource => d.loadInstanceResource // yarn resource has special logic
case _ => new LoadInstanceResource(Long.MaxValue, Integer.MAX_VALUE, Integer.MAX_VALUE)
}
def add(r: Resource): LoadInstanceResource =
new LoadInstanceResource(memory + r.memory, cores + r.cores, instances + r.instances)
def minus(r: Resource): LoadInstanceResource =
new LoadInstanceResource(memory - r.memory, cores - r.cores, instances - r.instances)
def multiplied(r: Resource): LoadInstanceResource =
new LoadInstanceResource(memory * r.memory, cores * r.cores, instances * r.instances)
def multiplied(rate: Float): LoadInstanceResource =
new LoadInstanceResource(
(memory * rate).toLong,
math.round(cores * rate),
(instances * rate).toInt
)
def divide(r: Resource): LoadInstanceResource =
new LoadInstanceResource(memory / r.memory, cores / r.cores, instances / r.instances)
def divide(rate: Int): LoadInstanceResource =
new LoadInstanceResource(memory / rate, cores / rate, instances * rate)
def moreThan(r: Resource): Boolean =
memory > r.memory && cores > r.cores && instances > r.instances
def caseMore(r: Resource): Boolean =
memory > r.memory || cores > r.cores || instances > r.instances
def equalsTo(r: Resource): Boolean =
memory == r.memory && cores == r.cores && instances == r.instances
override def notLess(r: Resource): Boolean =
memory >= r.memory && cores >= r.cores && instances >= r.instances
override def toJson: String =
s"""{"instance":$instances,"memory":"${ByteTimeUtils.bytesToString(memory)}","cpu":$cores}"""
override def toString: String =
s"Number of instances(实例数):$instances,(RAM)内存:${ByteTimeUtils.bytesToString(memory)},cpu:$cores"
}
class InstanceResource(val instances: Int) extends CPUResource(instances) {
override protected def toResource(cores: Int): Resource = new InstanceResource(cores)
override def toJson: String = s"Instance: $instances"
override def toString: String = toJson
}
/**
* Queue resource information, no initial registration, only user resource usage limit
* 队列资源信息,无初始化注册,只有用户资源使用上限
*
* @param queueName
* @param queueMemory
* @param queueCores
* @param queueInstances
*/
class YarnResource(
val queueMemory: Long,
val queueCores: Int,
val queueInstances: Int,
val queueName: String = "default",
val applicationId: String = ""
) extends Resource {
implicit def toYarnResource(r: Resource): YarnResource = r match {
case t: YarnResource => t
case _ => new YarnResource(Long.MaxValue, Integer.MAX_VALUE, Integer.MAX_VALUE, "default")
}
def add(r: Resource): YarnResource = {
new YarnResource(
queueMemory + r.queueMemory,
queueCores + r.queueCores,
queueInstances + r.queueInstances,
r.queueName
)
}
def minus(r: Resource): YarnResource = {
new YarnResource(
queueMemory - r.queueMemory,
queueCores - r.queueCores,
queueInstances - r.queueInstances,
r.queueName
)
}
def multiplied(r: Resource): YarnResource = {
new YarnResource(
queueMemory * r.queueMemory,
queueCores * r.queueCores,
queueInstances * r.queueInstances,
r.queueName
)
}
def multiplied(rate: Float): YarnResource =
new YarnResource(
(queueMemory * rate).toInt,
(queueCores * rate).toInt,
(queueInstances * rate).toInt,
queueName
)
def divide(r: Resource): YarnResource = {
new YarnResource(
queueMemory / r.queueMemory,
queueCores / r.queueCores,
queueInstances / r.queueInstances,
r.queueName
)
}
def divide(rate: Int): YarnResource =
new YarnResource(queueMemory / rate, queueCores / rate, queueInstances / rate, queueName)
def moreThan(r: Resource): Boolean = {
queueMemory > r.queueMemory && queueCores > r.queueCores
}
def caseMore(r: Resource): Boolean = {
queueMemory > r.queueMemory || queueCores > r.queueCores
}
def equalsTo(r: Resource): Boolean =
queueName == r.queueName && queueMemory == r.queueMemory && queueCores == r.queueCores
override def notLess(r: Resource): Boolean =
queueMemory >= r.queueMemory && queueCores >= r.queueCores
override def toJson: String =
s"""{"queueName":"$queueName","queueMemory":"${ByteTimeUtils.bytesToString(
queueMemory
)}", "queueCpu":$queueCores, "instance":$queueInstances}"""
override def toString: String =
s"Queue name(队列名):$queueName,Queue memory(队列内存):${ByteTimeUtils.bytesToString(queueMemory)},Queue core number(队列核数):$queueCores, Number of queue instances(队列实例数):${queueInstances}"
}
class DriverAndYarnResource(
val loadInstanceResource: LoadInstanceResource,
val yarnResource: YarnResource
) extends Resource
with Logging {
private implicit def DriverAndYarnResource(r: Resource): DriverAndYarnResource = r match {
case t: DriverAndYarnResource => t
case y: YarnResource => new DriverAndYarnResource(new LoadInstanceResource(0, 0, 0), y)
case t: LoadInstanceResource => new DriverAndYarnResource(t, new YarnResource(0, 0, 0))
case l: LoadResource =>
new DriverAndYarnResource(
new LoadInstanceResource(l.memory, l.cores, 0),
new YarnResource(0, 0, 0)
)
case m: MemoryResource =>
new DriverAndYarnResource(new LoadInstanceResource(m.memory, 0, 0), new YarnResource(0, 0, 0))
case c: CPUResource =>
new DriverAndYarnResource(new LoadInstanceResource(0, c.cores, 0), new YarnResource(0, 0, 0))
case _ =>
new DriverAndYarnResource(
new LoadInstanceResource(Long.MaxValue, Integer.MAX_VALUE, Integer.MAX_VALUE),
new YarnResource(Long.MaxValue, Integer.MAX_VALUE, Integer.MAX_VALUE)
)
}
def isModuleOperate(r: Resource): Boolean = {
// TODO This method needs to return false by default, and this method needs to be removed later
false
}
def isModuleOperate: Boolean = {
if (this.yarnResource != null && StringUtils.isNotEmpty(this.yarnResource.queueName)) {
false
} else {
true
}
}
override def add(r: Resource): DriverAndYarnResource = {
if (isModuleOperate(r)) {
new DriverAndYarnResource(
this.loadInstanceResource.add(r.loadInstanceResource),
this.yarnResource
)
} else {
new DriverAndYarnResource(
this.loadInstanceResource.add(r.loadInstanceResource),
this.yarnResource.add(r.yarnResource)
)
}
}
override def minus(r: Resource): Resource = {
if (isModuleOperate(r)) {
new DriverAndYarnResource(
this.loadInstanceResource.minus(r.loadInstanceResource),
this.yarnResource
)
} else {
new DriverAndYarnResource(
this.loadInstanceResource.minus(r.loadInstanceResource),
this.yarnResource.minus(r.yarnResource)
)
}
}
override def multiplied(r: Resource): Resource = {
throw new ResourceWarnException(
OPERATION_MULTIPLIED.getErrorCode,
OPERATION_MULTIPLIED.getErrorDesc
)
}
override def multiplied(rate: Float): Resource = {
if (isModuleOperate) {
new DriverAndYarnResource(this.loadInstanceResource.multiplied(rate), this.yarnResource)
} else {
new DriverAndYarnResource(
this.loadInstanceResource.multiplied(rate),
this.yarnResource.multiplied(rate)
)
}
}
override def divide(r: Resource): Resource =
throw new ResourceWarnException(
OPERATION_MULTIPLIED.getErrorCode,
OPERATION_MULTIPLIED.getErrorDesc
)
override def divide(rate: Int): Resource = if (isModuleOperate) {
new DriverAndYarnResource(this.loadInstanceResource.divide(rate), this.yarnResource)
} else {
new DriverAndYarnResource(
this.loadInstanceResource.divide(rate),
this.yarnResource.divide(rate)
)
}
override def moreThan(r: Resource): Boolean = {
if (isModuleOperate(r)) {
this.loadInstanceResource.moreThan(r.loadInstanceResource)
} else {
this.loadInstanceResource.moreThan(r.loadInstanceResource) && this.yarnResource.moreThan(
r.yarnResource
)
}
}
override def caseMore(r: Resource): Boolean = if (isModuleOperate(r)) {
this.loadInstanceResource.caseMore(r.loadInstanceResource)
} else {
this.loadInstanceResource.caseMore(r.loadInstanceResource) || this.yarnResource.caseMore(
r.yarnResource
)
}
override def equalsTo(r: Resource): Boolean = if (isModuleOperate(r)) {
this.loadInstanceResource.equalsTo(r.loadInstanceResource)
} else {
this.loadInstanceResource.equalsTo(r.loadInstanceResource) && this.yarnResource.equalsTo(
r.yarnResource
)
}
override def notLess(r: Resource): Boolean = if (isModuleOperate(r)) {
this.loadInstanceResource.notLess(r.loadInstanceResource)
} else {
this.loadInstanceResource.notLess(r.loadInstanceResource) && this.yarnResource.notLess(
r.yarnResource
)
}
override def toJson: String = {
var load = "null"
var yarn = "null"
if (loadInstanceResource != null) load = loadInstanceResource.toJson
if (yarnResource != null) yarn = yarnResource.toJson
s"""{"driver":${load}, "yarn":${yarn}}"""
}
override def toString: String =
s"Driver resources(Driver资源):$loadInstanceResource,Queue resource(队列资源):$yarnResource"
}
class SpecialResource(val resources: java.util.Map[String, AnyVal]) extends Resource {
def this(resources: Map[String, AnyVal]) = this(resources.asJava)
private def specialResourceOperator(
r: Resource,
op: (AnyVal, AnyVal) => AnyVal
): SpecialResource = r match {
case s: SpecialResource =>
val rs = s.resources
new SpecialResource(resources.asScala.map { case (k, v) =>
val v1 = rs.get(k)
k -> op(v, v1)
}.toMap)
case _ => new SpecialResource(Map.empty[String, AnyVal])
}
override def add(r: Resource): Resource = specialResourceOperator(
r,
(v1, v2) =>
v1 match {
case i: Int => i + v2.asInstanceOf[Int]
case d: Double => d + v2.asInstanceOf[Double]
case l: Long => l + v2.asInstanceOf[Long]
case f: Float => f + v2.asInstanceOf[Float]
case s: Short => s + v2.asInstanceOf[Short]
case _ =>
throw new ResourceWarnException(
NOT_RESOURCE_TYPE.getErrorCode,
MessageFormat.format(NOT_RESOURCE_TYPE.getErrorDesc, r.getClass)
)
}
)
override def minus(r: Resource): Resource = specialResourceOperator(
r,
(v1, v2) =>
v1 match {
case i: Int => i - v2.asInstanceOf[Int]
case d: Double => d - v2.asInstanceOf[Double]
case l: Long => l - v2.asInstanceOf[Long]
case f: Float => f - v2.asInstanceOf[Float]
case s: Short => s - v2.asInstanceOf[Short]
case _ =>
throw new ResourceWarnException(
NOT_RESOURCE_TYPE.getErrorCode,
MessageFormat.format(NOT_RESOURCE_TYPE.getErrorDesc, r.getClass)
)
}
)
override def multiplied(r: Resource): Resource = specialResourceOperator(
r,
(v1, v2) =>
v1 match {
case i: Int => i * v2.asInstanceOf[Int]
case d: Double => d * v2.asInstanceOf[Double]
case l: Long => l * v2.asInstanceOf[Long]
case f: Float => f * v2.asInstanceOf[Float]
case s: Short => s * v2.asInstanceOf[Short]
case _ =>
throw new ResourceWarnException(
NOT_RESOURCE_TYPE.getErrorCode,
MessageFormat.format(NOT_RESOURCE_TYPE.getErrorDesc, r.getClass)
)
}
)
override def multiplied(rate: Float): Resource = new SpecialResource(
resources.asScala
.map {
case (k, i: Int) => k -> (i * rate).toInt
case (k, d: Double) => k -> d * rate
case (k, l: Long) => k -> (l * rate).toLong
case (k, f: Float) => k -> f * rate
case (k, s: Short) => k -> (s * rate).toShort
case (k, v) =>
throw new ResourceWarnException(
NOT_RESOURCE_TYPE.getErrorCode,
MessageFormat.format(NOT_RESOURCE_TYPE.getErrorDesc, v.getClass)
)
}
.toMap[String, AnyVal]
)
override def divide(r: Resource): Resource = specialResourceOperator(
r,
(v1, v2) =>
v1 match {
case i: Int => i / v2.asInstanceOf[Int]
case d: Double => d / v2.asInstanceOf[Double]
case l: Long => l / v2.asInstanceOf[Long]
case f: Float => f / v2.asInstanceOf[Float]
case s: Short => s / v2.asInstanceOf[Short]
case _ =>
throw new ResourceWarnException(
NOT_RESOURCE_TYPE.getErrorCode,
MessageFormat.format(NOT_RESOURCE_TYPE.getErrorDesc, r.getClass)
)
}
)
override def divide(rate: Int): Resource = new SpecialResource(
resources.asScala
.map {
case (k, i: Int) => k -> i / rate
case (k, d: Double) => k -> d / rate
case (k, l: Long) => k -> l / rate
case (k, f: Float) => k -> f / rate
case (k, s: Short) => k -> (s / rate).toShort
case (k, v) =>
throw new ResourceWarnException(
NOT_RESOURCE_TYPE.getErrorCode,
MessageFormat.format(NOT_RESOURCE_TYPE.getErrorDesc, v.getClass)
)
}
.toMap[String, AnyVal]
)
override def moreThan(r: Resource): Boolean = r match {
case s: SpecialResource =>
val rs = s.resources
!resources.asScala.exists {
case (k, i: Int) => i <= rs.get(k).asInstanceOf[Int]
case (k, d: Double) => d <= rs.get(k).asInstanceOf[Double]
case (k, l: Long) => l <= rs.get(k).asInstanceOf[Long]
case (k, f: Float) => f <= rs.get(k).asInstanceOf[Float]
case (k, s: Short) => s <= rs.get(k).asInstanceOf[Short]
case (k, v) =>
throw new ResourceWarnException(
NOT_RESOURCE_TYPE.getErrorCode,
MessageFormat.format(NOT_RESOURCE_TYPE.getErrorDesc, v.getClass)
)
}
case _ => true
}
/**
* Part is greater than(部分大于)
*
* @param r
* @return
*/
override def caseMore(r: Resource): Boolean = r match {
case s: SpecialResource =>
val rs = s.resources
resources.asScala.exists {
case (k, i: Int) => i > rs.get(k).asInstanceOf[Int]
case (k, d: Double) => d > rs.get(k).asInstanceOf[Double]
case (k, l: Long) => l > rs.get(k).asInstanceOf[Long]
case (k, f: Float) => f > rs.get(k).asInstanceOf[Float]
case (k, s: Short) => s > rs.get(k).asInstanceOf[Short]
case (k, v) =>
throw new ResourceWarnException(
NOT_RESOURCE_TYPE.getErrorCode,
MessageFormat.format(NOT_RESOURCE_TYPE.getErrorDesc, v.getClass)
)
}
case _ => true
}
override def equalsTo(r: Resource): Boolean = r match {
case s: SpecialResource =>
val rs = s.resources
!resources.asScala.exists {
case (k, i: Int) => i != rs.get(k).asInstanceOf[Int]
case (k, d: Double) => d != rs.get(k).asInstanceOf[Double]
case (k, l: Long) => l != rs.get(k).asInstanceOf[Long]
case (k, f: Float) => f != rs.get(k).asInstanceOf[Float]
case (k, s: Short) => s != rs.get(k).asInstanceOf[Short]
case (k, v) =>
throw new ResourceWarnException(
NOT_RESOURCE_TYPE.getErrorCode,
MessageFormat.format(NOT_RESOURCE_TYPE.getErrorDesc, v.getClass)
)
}
case _ => true
}
override def notLess(r: Resource): Boolean = r match {
case s: SpecialResource =>
val rs = s.resources
!resources.asScala.exists {
case (k, i: Int) => i < rs.get(k).asInstanceOf[Int]
case (k, d: Double) => d < rs.get(k).asInstanceOf[Double]
case (k, l: Long) => l < rs.get(k).asInstanceOf[Long]
case (k, f: Float) => f < rs.get(k).asInstanceOf[Float]
case (k, s: Short) => s < rs.get(k).asInstanceOf[Short]
case (k, v) =>
throw new ResourceWarnException(
NOT_RESOURCE_TYPE.getErrorCode,
MessageFormat.format(NOT_RESOURCE_TYPE.getErrorDesc, v.getClass)
)
}
case _ => true
}
override def toJson: String = s"Special:$resources"
}
object ResourceSerializer
extends CustomSerializer[Resource](implicit formats =>
(
{
case JObject(List(("memory", memory))) => new MemoryResource(memory.extract[Long])
case JObject(List(("cores", cores))) => new CPUResource(cores.extract[Int])
case JObject(List(("instance", instances))) =>
new InstanceResource(instances.extract[Int])
case JObject(List(("memory", memory), ("cores", cores))) =>
new LoadResource(memory.extract[Long], cores.extract[Int])
case JObject(List(("memory", memory), ("cores", cores), ("instance", instances))) =>
new LoadInstanceResource(
memory.extract[Long],
cores.extract[Int],
instances.extract[Int]
)
case JObject(
List(
("applicationId", applicationId),
("queueName", queueName),
("queueMemory", queueMemory),
("queueCores", queueCores),
("queueInstances", queueInstances)
)
) =>
new YarnResource(
queueMemory.extract[Long],
queueCores.extract[Int],
queueInstances.extract[Int],
queueName.extract[String],
applicationId.extract[String]
)
case JObject(
List(
(
"DriverAndYarnResource",
JObject(
List(
("loadInstanceResource", loadInstanceResource),
("yarnResource", yarnResource)
)
)
)
)
) =>
implicit val formats = DefaultFormats
new DriverAndYarnResource(
loadInstanceResource.extract[LoadInstanceResource],
yarnResource.extract[YarnResource]
)
case JObject(List(("resources", resources))) =>
new SpecialResource(resources.extract[Map[String, AnyVal]])
case JObject(list) =>
throw new ResourceWarnException(
NOT_RESOURCE_STRING.getErrorCode,
NOT_RESOURCE_STRING.getErrorDesc + list
)
},
{
case m: MemoryResource => ("memory", m.memory)
case c: CPUResource => ("cores", c.cores)
case i: InstanceResource => ("instance", i.instances)
case l: LoadResource => ("memory", l.memory) ~ ("cores", l.cores)
case li: LoadInstanceResource =>
("memory", li.memory) ~ ("cores", li.cores) ~ ("instance", li.instances)
case yarn: YarnResource =>
(
"applicationId",
yarn.applicationId
) ~ ("queueName", yarn.queueName) ~ ("queueMemory", yarn.queueMemory) ~ ("queueCores", yarn.queueCores) ~ ("queueInstances", yarn.queueInstances)
case dy: DriverAndYarnResource =>
implicit val formats = DefaultFormats
(
"DriverAndYarnResource",
new JObject(
List(
("loadInstanceResource", Extraction.decompose(dy.loadInstanceResource)),
("yarnResource", Extraction.decompose(dy.yarnResource))
)
)
)
case s: SpecialResource =>
("resources", Serialization.write(s.resources.asScala.toMap))
case r: Resource =>
throw new ResourceWarnException(
NOT_RESOURCE_TYPE.getErrorCode,
MessageFormat.format(NOT_RESOURCE_TYPE.getErrorDesc, r.getClass)
)
}
)
)