blob: 80df607bb3b44210ecd090b8c4c2e61e602241dd [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.resourcemanager.service.rm
import java.util.UUID
import java.util.concurrent.{ScheduledFuture, TimeUnit}
import com.webank.wedatasphere.linkis.common.ServiceInstance
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.resourcemanager.ResourceRequestPolicy._
import com.webank.wedatasphere.linkis.resourcemanager._
import com.webank.wedatasphere.linkis.resourcemanager.domain._
import com.webank.wedatasphere.linkis.resourcemanager.event.EventScope
import com.webank.wedatasphere.linkis.resourcemanager.event.notify.{ModuleRegisterEvent, _}
import com.webank.wedatasphere.linkis.resourcemanager.exception.{RMErrorException, RMWarnException}
import com.webank.wedatasphere.linkis.resourcemanager.schedule.EventGroupFactory
import com.webank.wedatasphere.linkis.resourcemanager.service.metadata.{ModuleResourceRecordService, ResourceLockService, UserMetaData, UserResourceRecordService}
import com.webank.wedatasphere.linkis.resourcemanager.utils.RMConfiguration
import com.webank.wedatasphere.linkis.rpc.Sender
import org.springframework.beans.factory.InitializingBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
/**
* Created by johnnwang on 9/12/18.
*/
@Component
class DefaultResourceManager extends ResourceManager with Logging with InitializingBean {
override protected val rmContext: RMContext = RMContext.getOrCreateRMContext()
@Autowired
var userResourceRecordService: UserResourceRecordService = _
@Autowired
var moduleResourceRecordService: ModuleResourceRecordService = _
@Autowired
var moduleResourceManager: ModuleResourceManager = _
@Autowired
var userResourceManager: UserResourceManager = _
@Autowired
var resourceLockService: ResourceLockService = _
@Autowired
var userMetaData: UserMetaData = _
var requestResourceServices: Array[RequestResourceService] = _
private var future: ScheduledFuture[_] = _
override def afterPropertiesSet(): Unit = {
requestResourceServices = Array(new DefaultReqResourceService(userMetaData, userResourceRecordService, moduleResourceRecordService, userResourceManager, moduleResourceManager),
new YarnReqResourceService(userMetaData, userResourceRecordService, moduleResourceRecordService, userResourceManager, moduleResourceManager),
new DriverAndYarnReqResourceService(userMetaData, userResourceRecordService, moduleResourceRecordService, userResourceManager, moduleResourceManager))
future = Utils.defaultScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
moduleResourceManager.getModuleNames().foreach { moduleName =>
val existsEngineInstances = Sender.getInstances(moduleName)
moduleResourceManager.getModuleInstancesByName(moduleName).foreach { moduleResourceRecord =>
val interval = System.currentTimeMillis() - moduleResourceRecord.getRegisterTime
//TODO Remove waiting(去掉等待)
val moduleInstance = ServiceInstance(moduleResourceRecord.getEmApplicationName, moduleResourceRecord.getEmInstance)
if (interval > RMConfiguration.RM_REGISTER_INTERVAL_TIME.getValue && !existsEngineInstances.contains(moduleInstance)) {
warn(s"${moduleInstance} has been killed, now remove it, and unregister it for RM.")
Utils.tryAndWarn(unregister(moduleInstance))
}
}
}
}
}, RMConfiguration.RM_COMPLETED_SCAN_INTERVAL.getValue, RMConfiguration.RM_COMPLETED_SCAN_INTERVAL.getValue.toLong, TimeUnit.MILLISECONDS)
Utils.defaultScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
info("Start scanning engines.")
val engineToRecordMap = userResourceRecordService.getAll().filter(_.getEngineApplicationName != null).groupBy[String](_.getEngineApplicationName)
engineToRecordMap.keySet.foreach { engineName =>
info(s"processing engine ${engineName}")
val existingEngineInstances = Sender.getInstances(engineName)
engineToRecordMap.get(engineName) match {
case Some(engineRecords) =>
engineRecords.foreach { engineRecord =>
val engineInstance = ServiceInstance(engineRecord.getEngineApplicationName, engineRecord.getEngineInstance)
if (System.currentTimeMillis - engineRecord.getUsedTime > RMConfiguration.RM_ENGINE_RELEASE_THRESHOLD.getValue.toLong
&& !existingEngineInstances.contains(engineInstance)) {
info(s"engine ${engineInstance} no longer exists in service list, releasing it.")
val engineResultResource = UserResultResource(engineRecord.getTicketId, engineRecord.getUser)
val moduleInstance = ServiceInstance(engineRecord.getEmApplicationName, engineRecord.getEmInstance)
resourceReleased(engineResultResource, moduleInstance)
}
}
case None =>
}
}
info("Complete scanning engines.")
}
}, 0, RMConfiguration.RM_ENGINE_SCAN_INTERVAL.getValue.toLong, TimeUnit.MILLISECONDS)
}
/**
* The registration method is mainly used to notify all RM nodes (including the node)
* 该注册方法,主要是用于通知所有的RM节点(包括本节点)
*
* @param moduleInfo
*/
override def register(moduleInfo: ModuleInfo): Unit = {
info(s"Get a module registration request: $moduleInfo")
if (moduleInfo.totalResource < Resource.getZeroResource(moduleInfo.totalResource) || moduleInfo.protectedResource < Resource.getZeroResource(moduleInfo.protectedResource))
throw new RMErrorException(11002, s"Failed to register module : $ModuleInfo,Resources cannot be negative")
if (moduleInfo.protectedResource > moduleInfo.totalResource)
throw new RMErrorException(11002, s"Failed to register module : $ModuleInfo,protectedResource must be less than totalResource")
moduleResourceManager.dealModuleRegisterEvent(new ModuleRegisterEvent(EventScope.Instance, moduleInfo))
}
/**
* The registration method is mainly used to notify all RM nodes (including the node), and the instance is offline.
* 该注册方法,主要是用于通知所有的RM节点(包括本节点),下线该实例
*/
override def unregister(moduleInstance: ServiceInstance): Unit = {
info(s"Get a module unregister request: $moduleInstance")
moduleResourceManager.dealModuleUnregisterEvent(new ModuleUnregisterEvent(EventScope.Instance, moduleInstance))
}
/**
* Whether the instance of the module has been overloaded
*   * TODO is not available when it is larger than the protection resource
* 是否该module的实例,已经使用超载了
* TODO 大于保护资源则不可用
*
* @param moduleInstance
* @return
*/
override def instanceCanService(moduleInstance: ServiceInstance): Boolean = {
val moduleResourceRecord = moduleResourceRecordService.getByEmInstance(moduleInstance.getApplicationName, moduleInstance.getInstance)
val leftResource = moduleResourceRecordService.deserialize(moduleResourceRecord.getLeftResource)
val protectedResource = moduleResourceRecordService.deserialize(moduleResourceRecord.getProtectedResource)
info(s"Module(模块): $moduleInstance Remaining resources(剩余资源):${leftResource} Protect resources(保护资源):${protectedResource}")
leftResource > protectedResource
}
/**
* Request resources, if not successful, return directly
* 请求资源,如果不成功,直接返回
*
* @param user
* @param resource
* @return
*/
override def requestResource(moduleInstance: ServiceInstance, user: String, creator: String, resource: Resource): ResultResource = {
info(s"Get resource request of user:$user,info:moduleInstance:$moduleInstance,creator:$creator,resource:$resource")
//1.Determine if there are still events(判断是否还有事件)
if (hasModuleInstanceEvent(moduleInstance)) {
warn(s"There are still unconsumed event of module: $moduleInstance")
return NotEnoughResource(s"There are still unconsumed event of module:$moduleInstance")
}
if (hasUserEvent(user, moduleInstance.getApplicationName)) {
warn(s"There are still unconsumed event of user: $user")
return NotEnoughResource(s"There are still unconsumed event of user: $user")
}
//2.Can I apply?(是否可以申请)
val reqService = getRequestResourceService(moduleInstance)
try {
if (!reqService.canRequest(moduleInstance, user, creator, resource))
return NotEnoughResource(s"user:$user not enough resource")
} catch {
case warn: RMWarnException => return NotEnoughResource(warn.getMessage)
}
//3.Lock engine, lock user(锁引擎,锁用户)
Utils.tryFinally {
resourceLockService.tryLock(null, moduleInstance.getApplicationName, moduleInstance.getInstance)
resourceLockService.tryLock(user, moduleInstance.getApplicationName, moduleInstance.getInstance)
try {
if (!reqService.canRequest(moduleInstance, user, creator, resource))
return NotEnoughResource(s"user:$user not enough resource")
} catch {
case warn: RMWarnException => return NotEnoughResource(warn.getMessage)
}
//4.Broadcast resource application(广播资源申请)
val tickedId = UUID.randomUUID().toString
val userPreUsedResource = UserPreUsedResource(tickedId, moduleInstance, resource)
userResourceManager.dealUserPreUsedEvent(new UserPreUsedEvent(EventScope.User, user, creator, userPreUsedResource))
if (RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue > 0) Utils.defaultScheduler.schedule(
new ClearPreUsedRunnable(new ClearPrdUsedEvent(EventScope.User, user, userPreUsedResource)),
RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue,
TimeUnit.MILLISECONDS
)
info(s"$user succeed to request resource:$resource")
AvailableResource(tickedId)
} {
//5.Release lock(释放锁)
resourceLockService.unLock(null, moduleInstance.getApplicationName, moduleInstance.getInstance)
resourceLockService.unLock(user, moduleInstance.getApplicationName, moduleInstance.getInstance)
}
}
/**
* Request resources and wait for a certain amount of time until the requested resource is met
* 请求资源,并等待一定的时间,直到满足请求的资源
*
* @param user
* @param resource
* @param wait
* @return
*/
override def requestResource(moduleInstance: ServiceInstance, user: String, creator: String, resource: Resource, wait: Long): ResultResource = {
info(s"Get resource request of user:$user,info:moduleInstance:$moduleInstance,creator:$creator,resource:$resource")
val start = System.currentTimeMillis
//1.Determine if there are any more events(判断是否还有事件)
if (hasModuleInstanceEvent(moduleInstance)) {
warn(s"There are still unconsumed event of module: $moduleInstance")
return NotEnoughResource(s"There are still unconsumed event of module:$moduleInstance")
}
if (hasUserEvent(user, moduleInstance.getApplicationName)) {
warn(s"There are still unconsumed event of user: $user")
return NotEnoughResource(s"There are still unconsumed event of user: $user")
}
//2.Can I apply?(是否可以申请)
val reqService = getRequestResourceService(moduleInstance)
try {
if (!reqService.canRequest(moduleInstance, user, creator, resource))
return NotEnoughResource(s"user:$user not enough resource")
} catch {
case warn: RMWarnException => return NotEnoughResource(warn.getMessage)
}
//3.Lock engine, lock user(锁引擎,锁用户)
Utils.tryFinally {
resourceLockService.tryLock(null, moduleInstance.getApplicationName, moduleInstance.getInstance, wait)
if (System.currentTimeMillis() - start > wait)
return NotEnoughResource("time out")
val newWait = wait - (System.currentTimeMillis() - start)
resourceLockService.tryLock(user, moduleInstance.getApplicationName, moduleInstance.getInstance, newWait)
if (System.currentTimeMillis() - start > wait)
return NotEnoughResource("time out")
try {
if (!reqService.canRequest(moduleInstance, user, creator, resource))
return NotEnoughResource(s"user:$user not enough resource")
} catch {
case warn: RMWarnException => return NotEnoughResource(warn.getMessage)
}
//4.Broadcast resource application(广播资源申请)
val tickedId = UUID.randomUUID().toString
val userPreUsedResource = UserPreUsedResource(tickedId, moduleInstance, resource)
userResourceManager.dealUserPreUsedEvent(new UserPreUsedEvent(EventScope.User, user, creator, userPreUsedResource))
if (RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue > 0) Utils.defaultScheduler.schedule(
new ClearPreUsedRunnable(new ClearPrdUsedEvent(EventScope.User, user, userPreUsedResource)),
RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue,
TimeUnit.MILLISECONDS
)
info(s"$user succeed to request resource:$resource")
AvailableResource(tickedId)
} {
//5.Release lock(释放锁)
resourceLockService.unLock(null, moduleInstance.getApplicationName, moduleInstance.getInstance)
resourceLockService.unLock(user, moduleInstance.getApplicationName, moduleInstance.getInstance)
}
}
/**
* When the resource is instantiated, the total amount of resources actually occupied is returned.
* 当资源被实例化后,返回实际占用的资源总量
*
* @param resource
* In general, resourceReleased will release the resources occupied by the user, but if the process that uses the resource does not have time to call the resourceReleased method to die,
* you need to unregister to release the resource.
* @param moduleInstance 一般情况下,会由resourceReleased释放用户占用的资源,但是如果该使用资源的进程没来得及调用resourceReleased方法就死掉了,就需要unregister来释放了
* @param realUsed
*/
override def resourceInited(resource: ResultResource, moduleInstance: ServiceInstance, realUsed: Resource, engineInstance: ServiceInstance): Unit = {
val userResultResource = resource.asInstanceOf[UserResultResource]
val user = userResultResource.user
//1.Determine if there are any more events(判断是否还有事件)
if (hasModuleInstanceEvent(moduleInstance)) {
throw new RMWarnException(111004, s"There are still unconsumed event of module: $moduleInstance")
}
if (hasUserEvent(user, moduleInstance.getApplicationName)) {
throw new RMWarnException(111004, s"There are still unconsumed event of user: $user")
}
Utils.tryFinally {
//2.Lock engine, lock user(锁引擎,锁用户)
resourceLockService.tryLock(null, moduleInstance.getApplicationName, moduleInstance.getInstance)
resourceLockService.tryLock(user, moduleInstance.getApplicationName, moduleInstance.getInstance)
//3.Broadcast resource use(广播资源使用)
val tickedId = userResultResource.ticketId
val usedResource: UserUsedResource = UserUsedResource(tickedId, moduleInstance, realUsed, engineInstance)
val userUsedEvent = new UserUsedEvent(EventScope.User, user, usedResource)
userResourceManager.dealUserUsedEvent(userUsedEvent)
//notifyEngine.notifyToAll(new WaitReleasedEvent(new ClearUsedEvent(EventScope.User, user, usedResource), RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue))
} {
//4.Release lock(释放锁)
resourceLockService.unLock(null, moduleInstance.getApplicationName, moduleInstance.getInstance)
resourceLockService.unLock(user, moduleInstance.getApplicationName, moduleInstance.getInstance)
info(s"resource: $resource resourceInited Processed(处理完毕)")
}
}
/**
* Method called when the resource usage is released
* 当资源使用完成释放后,调用的方法
*
* @param resultResource
*/
override def resourceReleased(resultResource: ResultResource, moduleInstance: ServiceInstance): Unit = {
val userResultResource = resultResource.asInstanceOf[UserResultResource]
val user = userResultResource.user
//1.Determine if there are any more events(判断是否还有事件)
if (hasModuleInstanceEvent(moduleInstance)) {
throw new RMWarnException(111004, s"There are still unconsumed event of module: $moduleInstance")
}
if (hasUserEvent(user, moduleInstance.getApplicationName)) {
throw new RMWarnException(111004, s"There are still unconsumed event of user: $user")
}
//2.Lock engine, lock user(锁引擎,锁用户)
Utils.tryFinally {
resourceLockService.tryLock(null, moduleInstance.getApplicationName, moduleInstance.getInstance)
resourceLockService.tryLock(user, moduleInstance.getApplicationName, moduleInstance.getInstance)
//3.Broadcast resource use(广播资源使用)
val tickedId = userResultResource.ticketId
val userReleasedResource = UserReleasedResource(tickedId, moduleInstance)
val userReleasedEvent = new UserReleasedEvent(EventScope.User, user, userReleasedResource)
userResourceManager.dealUserReleasedEvent(userReleasedEvent)
} {
//4.Release lock(释放锁)
resourceLockService.unLock(null, moduleInstance.getApplicationName, moduleInstance.getInstance)
resourceLockService.unLock(user, moduleInstance.getApplicationName, moduleInstance.getInstance)
info(s"resultResource:$resultResource resourceReleased Processed(处理完毕)")
}
}
/**
* If the IP and port are empty, return the resource status of all modules of a module
*   * Return the use of this instance resource if there is an IP and port
* 如果IP 和 端口传空,则返回某个module全部实例的资源情况
* 如果存在IP 和端口 则返回该实例资源的使用
*
* @param moduleInstance
* @return
*/
override def getModuleResourceInfo(moduleInstance: ServiceInstance): Array[ModuleResourceInfo] = {
moduleResourceManager.getModuleResourceInfo(moduleInstance)
}
def hasModuleInstanceEvent(moduleInstance: ServiceInstance): Boolean = {
val groupFactory = rmContext.getScheduler.getSchedulerContext.getOrCreateGroupFactory.asInstanceOf[EventGroupFactory]
val groupName = groupFactory.getGroupNameByModule(moduleInstance)
val consumer = rmContext.getScheduler.getSchedulerContext.getOrCreateConsumerManager.getOrCreateConsumer(groupName)
val hasRunning = consumer.getRunningEvents.exists {
case registerEvent: ModuleRegisterEvent => registerEvent.moduleName == moduleInstance.getApplicationName
case unregisterEvent: ModuleUnregisterEvent => unregisterEvent.moduleName == moduleInstance.getApplicationName
case _ => false
}
if (hasRunning) return true
val hasWaiting = consumer.getConsumeQueue.getWaitingEvents.exists {
case registerEvent: ModuleRegisterEvent => registerEvent.moduleName == moduleInstance.getApplicationName
case unregisterEvent: ModuleUnregisterEvent => unregisterEvent.moduleName == moduleInstance.getApplicationName
case _ => false
}
hasWaiting
}
def hasUserEvent(user: String, moduleName: String): Boolean = {
val groupFactory = rmContext.getScheduler.getSchedulerContext.getOrCreateGroupFactory.asInstanceOf[EventGroupFactory]
val groupName = groupFactory.getGroupNameByUser(user)
val consumer = rmContext.getScheduler.getSchedulerContext.getOrCreateConsumerManager.getOrCreateConsumer(groupName)
val hasRunning = consumer.getRunningEvents.exists {
case userPreUsedEvent: UserPreUsedEvent => userPreUsedEvent.moduleName == moduleName && userPreUsedEvent.user == user
case userUsedEvent: UserUsedEvent => userUsedEvent.moduleName == moduleName && userUsedEvent.user == user
case userReleasedEvent: UserReleasedEvent => userReleasedEvent.moduleName == moduleName && userReleasedEvent.user == user
case clearPrdUsedEvent: ClearPrdUsedEvent => clearPrdUsedEvent.moduleName == moduleName && clearPrdUsedEvent.user == user
case clearUsedEvent: ClearUsedEvent => clearUsedEvent.moduleName == moduleName && clearUsedEvent.user == user
case _ => false
}
if (hasRunning) return true
val hasWaiting = consumer.getConsumeQueue.getWaitingEvents.exists {
case userPreUsedEvent: UserPreUsedEvent => userPreUsedEvent.moduleName == moduleName && userPreUsedEvent.user == user
case userUsedEvent: UserUsedEvent => userUsedEvent.moduleName == moduleName && userUsedEvent.user == user
case userReleasedEvent: UserReleasedEvent => userReleasedEvent.moduleName == moduleName && userReleasedEvent.user == user
case clearPrdUsedEvent: ClearPrdUsedEvent => clearPrdUsedEvent.moduleName == moduleName && clearPrdUsedEvent.user == user
case clearUsedEvent: ClearUsedEvent => clearUsedEvent.moduleName == moduleName && clearUsedEvent.user == user
case _ => false
}
hasWaiting
}
def getRequestResourceService(moduleInstance: ServiceInstance): RequestResourceService = {
//2.Can I apply?(是否可以申请)
val requestResourceService = moduleResourceRecordService.getModulePolicy(moduleInstance.getApplicationName) match {
case Yarn => requestResourceServices.find(requestResourceService => requestResourceService != null && requestResourceService.requestPolicy == Yarn)
case DriverAndYarn => requestResourceServices.find(requestResourceService => requestResourceService != null && requestResourceService.requestPolicy == DriverAndYarn)
case _ => requestResourceServices.find(requestResourceService => requestResourceService != null && requestResourceService.requestPolicy == Default)
}
if (requestResourceService.isEmpty)
new RMErrorException(111004, s"The module:$moduleInstance not request Resource Service")
requestResourceService.get
}
class ClearPreUsedRunnable(clearPrdUsedEvent: ClearPrdUsedEvent) extends Runnable {
override def run(): Unit = {
val userResourceRecord = userResourceRecordService.getUserModuleRecord(clearPrdUsedEvent.user, clearPrdUsedEvent.ticketId)
if (userResourceRecord != null && userResourceRecord.getUserUsedResource == null) {
userResourceManager.dealClearPrdUsedEvent(clearPrdUsedEvent)
}
}
}
}