blob: 7ab52c28ffb1dd3fe405286df2778a8cad34d74c [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.resourcemanager.service
import com.webank.wedatasphere.linkis.common.ServiceInstance
import com.webank.wedatasphere.linkis.common.utils.Logging
import com.webank.wedatasphere.linkis.manager.common.entity.resource._
import com.webank.wedatasphere.linkis.resourcemanager.domain.RMLabelContainer
import com.webank.wedatasphere.linkis.resourcemanager.exception.RMWarnException
import com.webank.wedatasphere.linkis.resourcemanager.utils.RMUtils.aggregateResource
import com.webank.wedatasphere.linkis.resourcemanager.utils.{AlertUtils, RMConfiguration, UserConfiguration}
abstract class RequestResourceService(labelResourceService: LabelResourceService) extends Logging{
val resourceType: ResourceType = ResourceType.Default
def canRequest(labelContainer: RMLabelContainer, resource: NodeResource): Boolean = {
var labelResource = labelResourceService.getLabelResource(labelContainer.getCurrentLabel)
// for configuration resource
if(labelContainer.getCurrentLabel.equals(labelContainer.getCombinedUserCreatorEngineTypeLabel)){
if(labelResource == null) {
labelResource = new CommonNodeResource
labelResource.setResourceType(resource.getResourceType)
labelResource.setUsedResource(Resource.initResource(resource.getResourceType))
labelResource.setLockedResource(Resource.initResource(resource.getResourceType))
}
val configuredResource = UserConfiguration.getUserConfiguredResource(resource.getResourceType, labelContainer.getUserCreatorLabel, labelContainer.getEngineTypeLabel)
info(s"Get configured resource ${configuredResource} for [${labelContainer.getUserCreatorLabel}] and [${labelContainer.getEngineTypeLabel}]")
labelResource.setMaxResource(configuredResource)
labelResource.setMinResource(Resource.initResource(labelResource.getResourceType))
labelResource.setLeftResource(labelResource.getMaxResource - labelResource.getUsedResource - labelResource.getLockedResource)
}
info(s"Label [${labelContainer.getCurrentLabel}] has resource + [${labelResource }]")
if(labelResource != null){
val labelAvailableResource = labelResource.getLeftResource - labelResource.getMinResource
if(labelAvailableResource < resource.getMinResource){
info(s"Failed check: ${labelContainer.getUserCreatorLabel.getUser} want to use label [${labelContainer.getCurrentLabel}] resource[${resource.getMinResource}] > label available resource[${labelAvailableResource}]")
// TODO sendAlert(moduleInstance, user, creator, requestResource, moduleAvailableResource.resource, moduleLeftResource)
val notEnoughMessage = generateNotEnoughMessage(aggregateResource(labelResource.getUsedResource, labelResource.getLockedResource), labelAvailableResource)
throw new RMWarnException(notEnoughMessage._1, notEnoughMessage._2)
}
info(s"Passed check: ${labelContainer.getUserCreatorLabel.getUser} want to use label [${labelContainer.getCurrentLabel}] resource[${resource.getMinResource}] <= label available resource[${labelAvailableResource}]")
}
warn(s"No resource available found for label ${labelContainer.getCurrentLabel}")
return true
}
def sendAlert(moduleInstance: ServiceInstance, user: String, creator:String, requestResource: Resource, availableResource: Resource, moduleLeftResource: Resource) = {
if(RMConfiguration.ALERT_ENABLED.getValue){
info("start sending alert")
val title = s"user ${user} failed to request resource on EM(${moduleInstance.getApplicationName},${moduleInstance.getInstance})"
val queueContact = requestResource match {
case d: DriverAndYarnResource => AlertUtils.getContactByQueue(d.yarnResource.queueName)
case y: YarnResource => AlertUtils.getContactByQueue(y.queueName)
case _ => RMConfiguration.ALERT_DEFAULT_CONTACT.getValue
}
val detail =
s"请联系用户[${user}]或相关人员[${queueContact}]\n" +
s"user request resource: ${requestResource}\n " +
s"user available resource: ${availableResource}\n " +
s"EM left resource: ${moduleLeftResource}\n "
AlertUtils.sendAlertAsync(title, detail);
info("finished sending alert")
}
}
def generateNotEnoughMessage(requestResource: Resource, availableResource: Resource) : (Int, String) = {
requestResource match {
case m: MemoryResource =>
(11011, s"The remote server is out of memory resources(远程服务器内存资源不足。)")
case c: CPUResource =>
(11012, s"Insufficient CPU resources on remote server (远程服务器CPU资源不足。)")
case i: InstanceResource =>
(11013, s"The remote server is out of resources (远程服务器资源不足。)")
case l: LoadResource =>
val loadAvailable = availableResource.asInstanceOf[LoadResource]
if(l.cores > loadAvailable.cores){
(11012, s"Insufficient CPU resources on remote server (远程服务器CPU资源不足。)")
} else {
(11011, s"The remote server is out of memory resources(远程服务器内存资源不足。)")
}
case li: LoadInstanceResource =>
val loadInstanceAvailable = availableResource.asInstanceOf[LoadInstanceResource]
if(li.cores > loadInstanceAvailable.cores){
(11012, s"Insufficient CPU resources on remote server (远程服务器CPU资源不足。)")
} else if (li.memory > loadInstanceAvailable.memory) {
(11011, s"The remote server is out of memory resources(远程服务器内存资源不足。)")
} else {
(11013, s"The remote server is out of resources (远程服务器资源不足。)")
}
case yarn: YarnResource =>
val yarnAvailable = availableResource.asInstanceOf[YarnResource]
if(yarn.queueCores > yarnAvailable.queueCores){
(11014, s"The queue CPU resources are insufficient, it is recommended to reduce the number of executors(队列CPU资源不足,建议调小执行器个数。)")
} else if (yarn.queueMemory > yarnAvailable.queueMemory){
(11015, s"Insufficient queue memory resources, it is recommended to reduce the actuator memory (队列内存资源不足,建议调小执行器内存。)")
} else {
(11016, s"Number of queue instances exceeds limit (队列实例数超过限制。)")
}
case dy: DriverAndYarnResource =>
val dyAvailable = availableResource.asInstanceOf[DriverAndYarnResource]
if(dy.loadInstanceResource.memory > dyAvailable.loadInstanceResource.memory ||
dy.loadInstanceResource.cores > dyAvailable.loadInstanceResource.cores ||
dy.loadInstanceResource.instances > dyAvailable.loadInstanceResource.instances){
val detail = generateNotEnoughMessage(dy.loadInstanceResource, dyAvailable.loadInstanceResource)
(detail._1, s"Request a server resource, ${detail._2} 请求服务器资源时,${detail._2}")
} else {
val detail = generateNotEnoughMessage(dy.yarnResource, dyAvailable.yarnResource)
(detail._1, s"Request a server resource, ${detail._2} 请求队列资源时,${detail._2}")
}
case s: SpecialResource => throw new RMWarnException(11003,"not supported resource type " + s.getClass)
case r: Resource => throw new RMWarnException(11003,"not supported resource type " + r.getClass)
}
}
}