blob: 2785a0529fb825b03a0f6c1f5c94917fedc01831 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.resourcemanager.service.impl
import java.util
import java.util.UUID
import java.util.concurrent.TimeUnit
import com.google.common.collect.Lists
import com.webank.wedatasphere.linkis.common.ServiceInstance
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.governance.common.conf.GovernanceCommonConf
import com.webank.wedatasphere.linkis.manager.common.entity.node.{AMEMNode, AMEngineNode, InfoRMNode}
import com.webank.wedatasphere.linkis.manager.common.entity.resource.{CommonNodeResource, NodeResource, Resource, ResourceType}
import com.webank.wedatasphere.linkis.manager.common.utils.ResourceUtils
import com.webank.wedatasphere.linkis.manager.label.builder.factory.LabelBuilderFactoryContext
import com.webank.wedatasphere.linkis.manager.label.entity.Label
import com.webank.wedatasphere.linkis.manager.label.entity.em.EMInstanceLabel
import com.webank.wedatasphere.linkis.manager.label.entity.engine.EngineInstanceLabel
import com.webank.wedatasphere.linkis.manager.label.service.NodeLabelService
import com.webank.wedatasphere.linkis.manager.persistence.{NodeManagerPersistence, ResourceManagerPersistence}
import com.webank.wedatasphere.linkis.resourcemanager._
import com.webank.wedatasphere.linkis.resourcemanager.domain.RMLabelContainer
import com.webank.wedatasphere.linkis.resourcemanager.errorcode.RMErrorConstants
import com.webank.wedatasphere.linkis.resourcemanager.exception.{RMErrorException, RMWarnException}
import com.webank.wedatasphere.linkis.resourcemanager.external.service.ExternalResourceService
import com.webank.wedatasphere.linkis.resourcemanager.protocol.{TimeoutEMEngineRequest, TimeoutEMEngineResponse}
import com.webank.wedatasphere.linkis.resourcemanager.service._
import com.webank.wedatasphere.linkis.resourcemanager.utils.{RMConfiguration, RMUtils}
import com.webank.wedatasphere.linkis.rpc.Sender
import org.springframework.beans.factory.InitializingBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import scala.collection.JavaConversions._
@Component
class DefaultResourceManager extends ResourceManager with Logging with InitializingBean {
@Autowired
var resourceManagerPersistence : ResourceManagerPersistence = _
@Autowired
var nodeManagerPersistence: NodeManagerPersistence = _
@Autowired
var resourceLockService: ResourceLockService = _
@Autowired
var labelResourceService: LabelResourceService = _
@Autowired
var externalResourceService: ExternalResourceService = _
/*@Autowired
var nodeHeartbeatMonitor: NodeHeartbeatMonitor = _*/
@Autowired
private var nodeLabelService: NodeLabelService = _
var requestResourceServices: Array[RequestResourceService] = _
override def afterPropertiesSet(): Unit = {
requestResourceServices = Array(
new DefaultReqResourceService(labelResourceService),
new DriverAndYarnReqResourceService(labelResourceService, externalResourceService)
)
}
/**
* The registration method is mainly used to notify all RM nodes (including the node)
* 该注册方法,主要是用于通知所有的RM节点(包括本节点)
*
*
*/
override def register(serviceInstance: ServiceInstance, resource: NodeResource): Unit = {
info(s"Start processing registration of ServiceInstance: ${serviceInstance}")
val eMInstanceLabel = LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[EMInstanceLabel])
eMInstanceLabel.setServiceName(serviceInstance.getApplicationName)
eMInstanceLabel.setInstance(serviceInstance.getInstance)
val labelContainer = new RMLabelContainer(Lists.newArrayList(eMInstanceLabel))
Utils.tryFinally {
// lock labels
labelContainer.getLabels.toArray.foreach {
case label: Label[_] =>
labelContainer.setCurrentLabel(label.asInstanceOf[Label[_]])
resourceLockService.tryLock(labelContainer)
case _ =>
}
val emResource = labelResourceService.getLabelResource(eMInstanceLabel)
if(emResource != null){
warn(s"${serviceInstance} has been registered, now update resource.")
if(!emResource.getResourceType.equals(resource.getResourceType)){
throw new RMErrorException(110019, s"${serviceInstance} has been registered in ${emResource.getResourceType}, cannot be updated to ${resource.getResourceType}")
}
}
// TODO get ID Label set label resource
labelResourceService.setLabelResource(eMInstanceLabel, resource)
} {
//5.Release lock(释放锁)
resourceLockService.unLock(labelContainer)
info(s"finished processing registration of ${serviceInstance}")
}
}
/**
* The registration method is mainly used to notify all RM nodes (including the node), and the instance is offline.
* 该注册方法,主要是用于通知所有的RM节点(包括本节点),下线该实例
*/
override def unregister(serviceInstance: ServiceInstance): Unit = {
// TODO get ID Label
val eMInstanceLabel = LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[EMInstanceLabel])
eMInstanceLabel.setServiceName(serviceInstance.getApplicationName)
eMInstanceLabel.setInstance(serviceInstance.getInstance)
val labelContainer = new RMLabelContainer(Lists.newArrayList(eMInstanceLabel))
Utils.tryFinally {
// lock labels
labelContainer.getLabels.toArray.foreach {
case label: Label[_] =>
labelContainer.setCurrentLabel(label.asInstanceOf[Label[_]])
resourceLockService.tryLock(labelContainer)
case _ =>
}
// clear EM related engine resource records
nodeManagerPersistence.getEngineNodeByEM(serviceInstance).foreach{ node =>
val engineInstanceLabel = LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[EngineInstanceLabel])
engineInstanceLabel.setInstance(node.getServiceInstance.getInstance)
engineInstanceLabel.setServiceName(node.getServiceInstance.getApplicationName)
labelResourceService.removeResourceByLabel(engineInstanceLabel)
}
labelResourceService.removeResourceByLabel(eMInstanceLabel)
} {
//5.Release lock(释放锁)
resourceLockService.unLock(labelContainer)
info(s"finished processing unregistration of ${serviceInstance}")
}
}
/**
* Request resources, if not successful, return directly
* 请求资源,如果不成功,直接返回
*
* @param labels
* @param resource
* @return
*/
override def requestResource(labels: util.List[Label[_]], resource: NodeResource): ResultResource = {
requestResource(labels, resource, -1)
}
/**
* Request resources and wait for a certain amount of time until the requested resource is met
* 请求资源,并等待一定的时间,直到满足请求的资源
*
* @param labels
* @param resource
* @param wait
* @return
*/
override def requestResource(labels: util.List[Label[_]], resource: NodeResource, wait: Long): ResultResource = {
val startTime = System.currentTimeMillis
val labelContainer = labelResourceService.enrichLabels(new RMLabelContainer(labels))
info("start processing request resource for labels [" + labelContainer + "] and resource [" + resource + "]")
// pre-check without lock
val requestResourceService = getRequestResourceService(resource.getResourceType)
try {
labelContainer.getLabels.toArray.foreach{ label =>
labelContainer.setCurrentLabel(label.asInstanceOf[Label[_]])
if (!requestResourceService.canRequest(labelContainer, resource)){
return NotEnoughResource(s"Labels:$labels not enough resource")
}
}
} catch {
case warn: RMWarnException => return NotEnoughResource(warn.getMessage)
}
Utils.tryFinally {
// lock labels
labelContainer.getLabels.toArray.foreach {
case label: Label[_] =>
labelContainer.setCurrentLabel(label)
var locked = resourceLockService.tryLock(labelContainer)
if(wait <= 0){
locked = resourceLockService.tryLock(labelContainer)
} else {
locked = resourceLockService.tryLock(labelContainer, wait)
}
if (!locked){
return NotEnoughResource("Wait for lock time out")
}
// check again with lock
try {
if (!requestResourceService.canRequest(labelContainer, resource))
return NotEnoughResource(s"Labels:$labels not enough resource")
} catch {
case warn: RMWarnException => return NotEnoughResource(warn.getMessage)
}
case _ =>
}
resource.setLockedResource(resource.getMinResource)
// lock label resources
labelContainer.getLabels.toArray.foreach {
case label: Label[_] =>
var labelResource = labelResourceService.getLabelResource(label)
if(labelResource == null){
if(label.equals(labelContainer.getCombinedUserCreatorEngineTypeLabel)){
labelResource = CommonNodeResource.initNodeResource(resource.getResourceType)
labelResource.setLockedResource(resource.getMinResource)
}
} else {
labelResource.setLeftResource(labelResource.getLeftResource - resource.getLockedResource)
labelResource.setLockedResource(labelResource.getLockedResource + resource.getLockedResource)
labelResourceService.setLabelResource(label, labelResource)
}
case _ =>
}
// record engine locked resource
val tickedId = UUID.randomUUID().toString
val emNode = new AMEMNode
emNode.setServiceInstance(labelContainer.getEMInstanceLabel.getServiceInstance)
val engineNode = new AMEngineNode
engineNode.setEMNode(emNode)
engineNode.setServiceInstance(ServiceInstance(labelContainer.getEngineServiceName, tickedId))
engineNode.setNodeResource(resource)
nodeManagerPersistence.addEngineNode(engineNode)
val engineInstanceLabel = LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[EngineInstanceLabel])
engineInstanceLabel.setServiceName(labelContainer.getEngineServiceName)
engineInstanceLabel.setInstance(tickedId)
nodeLabelService.addLabelToNode(engineNode.getServiceInstance,engineInstanceLabel)
labelResourceService.setEngineConnLabelResource(engineInstanceLabel, resource)
// fire timeout check scheduled job
if (RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue > 0) Utils.defaultScheduler.schedule(
new UnlockTimeoutResourceRunnable(labels, engineInstanceLabel),
RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue,
TimeUnit.MILLISECONDS
)
info(s"labels $labels succeed to request resource:$resource")
AvailableResource(tickedId)
} {
//5.Release lock(释放锁)
resourceLockService.unLock(labelContainer)
info(s"finished processing requestResource of labels ${labels} and resource ${resource}")
}
}
/**
* When the resource is instantiated, the total amount of resources actually occupied is returned.
* 当资源被实例化后,返回实际占用的资源总量
*
* @param labels
* In general, resourceReleased will release the resources occupied by the user, but if the process that uses the resource does not have time to call the resourceReleased method to die, you need to unregister to release the resource.
* @param usedResource
*/
override def resourceUsed(labels: util.List[Label[_]], usedResource: NodeResource): Unit = {
val labelContainer = labelResourceService.enrichLabels(new RMLabelContainer(labels))
var lockedResource: NodeResource = null
try {
lockedResource = labelResourceService.getLabelResource(labelContainer.getEngineInstanceLabel)
} catch {
case e: NullPointerException =>
error(s"EngineInstanceLabel [${labelContainer.getEngineInstanceLabel}] cause NullPointerException")
throw e
}
if (lockedResource == null) throw new RMErrorException(RMErrorConstants.RM_ERROR, s"No " +
s"locked " +
s"resource found by engine ${labelContainer.getEngineInstanceLabel}")
if (!lockedResource.getLockedResource.equalsTo(usedResource.getUsedResource)) throw new RMErrorException(110022, s"Locked ${lockedResource.getLockedResource}, but want to use ${usedResource.getUsedResource}")
Utils.tryFinally {
// lock labels
labelContainer.getLabels.toArray.foreach {
case label: Label[_] =>
labelContainer.setCurrentLabel(label.asInstanceOf[Label[_]])
resourceLockService.tryLock(labelContainer)
case _ =>
}
labelContainer.getLabels.toArray.foreach {
case engineInstanceLabel: EngineInstanceLabel =>
lockedResource.setUsedResource(lockedResource.getLockedResource)
lockedResource.setLockedResource(Resource.getZeroResource(lockedResource.getLockedResource))
labelResourceService.setLabelResource(engineInstanceLabel, lockedResource)
case label: Label[_] =>
val labelResource = labelResourceService.getLabelResource(label)
if(labelResource != null){
labelResource.setLockedResource(labelResource.getLockedResource - usedResource.getUsedResource)
if(null == labelResource.getUsedResource) labelResource.setUsedResource(Resource.initResource(labelResource.getResourceType))
labelResource.setUsedResource(labelResource.getUsedResource + usedResource.getUsedResource)
labelResourceService.setLabelResource(label, labelResource)
}
case _ =>
}
}{
resourceLockService.unLock(labelContainer)
info(s"Labels:$labels resourceInited Processed(处理完毕)")
}
}
/**
* Method called when the resource usage is released
* 当资源使用完成释放后,调用的方法
*
* @param labels
*/
override def resourceReleased(labels: util.List[Label[_]]): Unit = {
val labelContainer = labelResourceService.enrichLabels(new RMLabelContainer(labels))
val usedResource = labelResourceService.getLabelResource(labelContainer.getEngineInstanceLabel)
if(usedResource == null) throw new RMErrorException(RMErrorConstants.RM_ERROR, s"No used resource " +
s"found by engine " +
s"${labelContainer.getEngineInstanceLabel}")
Utils.tryFinally {
// lock labels
labelContainer.getLabels.toArray.foreach {
case label: Label[_] =>
labelContainer.setCurrentLabel(label.asInstanceOf[Label[_]])
resourceLockService.tryLock(labelContainer)
case _ =>
}
labelContainer.getLabels.toArray.foreach {
case engineInstanceLabel: EngineInstanceLabel =>
labelResourceService.removeResourceByLabel(engineInstanceLabel)
case label: Label[_] =>
val labelResource = labelResourceService.getLabelResource(label)
if(labelResource != null){
if(null != usedResource.getUsedResource){
labelResource.setUsedResource(labelResource.getUsedResource - usedResource.getUsedResource)
labelResource.setLeftResource(labelResource.getLeftResource + usedResource.getUsedResource)
}
if(null != usedResource.getLockedResource){
labelResource.setLockedResource(labelResource.getLockedResource - usedResource.getLockedResource)
labelResource.setLeftResource(labelResource.getLeftResource + usedResource.getLockedResource)
}
labelResourceService.setLabelResource(label, labelResource)
}
case _ =>
}
} {
resourceLockService.unLock(labelContainer)
info(s"Labels:$labels resourceReleased Processed(处理完毕)")
}
}
/**
* If the IP and port are empty, return the resource status of all modules of a module
*   * Return the use of this instance resource if there is an IP and port
*
* @param serviceInstances
* @return
*/
override def getResourceInfo(serviceInstances: Array[ServiceInstance]): ResourceInfo = {
val resourceInfo = new ResourceInfo(Lists.newArrayList())
serviceInstances.map { serviceInstance =>
val rmNode = new InfoRMNode
var aggregatedResource: NodeResource = null
serviceInstance.getApplicationName match {
case GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue =>{
val engineInstanceLabel = LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[EngineInstanceLabel])
engineInstanceLabel.setServiceName(serviceInstance.getApplicationName)
engineInstanceLabel.setInstance(serviceInstance.getInstance)
aggregatedResource = labelResourceService.getLabelResource(engineInstanceLabel)
}
case GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME.getValue =>{
val emInstanceLabel = LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[EMInstanceLabel])
emInstanceLabel.setServiceName(serviceInstance.getApplicationName)
emInstanceLabel.setInstance(serviceInstance.getInstance)
aggregatedResource = labelResourceService.getLabelResource(emInstanceLabel)
}
}
rmNode.setServiceInstance(serviceInstance)
rmNode.setNodeResource(aggregatedResource)
resourceInfo.resourceInfo.add(rmNode)
}
resourceInfo
}
def getRequestResourceService(resourceType: ResourceType): RequestResourceService = {
val requestResourceService = requestResourceServices.find(_.resourceType == resourceType)
requestResourceService.getOrElse(requestResourceServices.find(_.resourceType == ResourceType.Default).get)
}
class UnlockTimeoutResourceRunnable(labels: util.List[Label[_]], engineInstanceLabel: EngineInstanceLabel) extends Runnable {
override def run(): Unit = {
info(s"check locked resource of ${engineInstanceLabel}")
val resource = labelResourceService.getLabelResource(engineInstanceLabel)
if(resource != null
&& resource.getLockedResource != null
&& resource.getLockedResource > Resource.getZeroResource(resource.getLockedResource)){
info(s"start to unlock timeout resource for ${engineInstanceLabel}")
val labelContainer = labelResourceService.enrichLabels(new RMLabelContainer(labels))
val timeoutEMEngineRequest = new TimeoutEMEngineRequest
timeoutEMEngineRequest.setTicketId(engineInstanceLabel.getInstance())
timeoutEMEngineRequest.setUser(labelContainer.getUserCreatorLabel.getUser)
val timeoutEMEngineResponse = Sender.getSender(labelContainer.getEMInstanceLabel.getServiceInstance).ask(timeoutEMEngineRequest).asInstanceOf[TimeoutEMEngineResponse]
if(timeoutEMEngineResponse.getCanReleaseResource){
Utils.tryFinally {
// lock labels
labelContainer.getLabels.toArray.foreach {
case label: Label[_] =>
labelContainer.setCurrentLabel(label.asInstanceOf[Label[_]])
resourceLockService.tryLock(labelContainer)
case _ =>
}
labelContainer.getLabels.toArray.foreach {
case engineInstanceLabel: EngineInstanceLabel =>
labelResourceService.removeResourceByLabel(engineInstanceLabel)
case label: Label[_] =>
val labelResource = labelResourceService.getLabelResource(label)
if(labelResource != null){
labelResource.setLockedResource(labelResource.getLockedResource - resource.getLockedResource)
labelResource.setLeftResource(labelResource.getLeftResource + resource.getLockedResource)
labelResourceService.setLabelResource(label, labelResource)
}
case _ =>
}
} {
resourceLockService.unLock(labelContainer)
info(s"Timeout resource unlocked for ${engineInstanceLabel}")
}
} else {
askAgainAfter(timeoutEMEngineResponse.getNextAskInterval)
}
}
info(s"finished check locked resource of ${engineInstanceLabel}")
}
private def askAgainAfter(interval: Long): Unit ={
val realInterval = if (interval == null) RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue else interval
Utils.defaultScheduler.schedule(
new UnlockTimeoutResourceRunnable(labels, engineInstanceLabel),
realInterval,
TimeUnit.MILLISECONDS
)
info(s"delayed resource unlocked for ${engineInstanceLabel}")
}
}
override def resourceReport(labels: util.List[Label[_]], reportResource: NodeResource): Unit = {
//todo to complete resource report
return
}
}