blob: ba0f6acec35403261afab8eea5c2722400a492a0 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.resourcemanager.service.metadata
import java.util
import com.webank.wedatasphere.linkis.common.ServiceInstance
import com.webank.wedatasphere.linkis.common.utils.Logging
import com.webank.wedatasphere.linkis.resourcemanager.dao.{EmMetaDataDao, EmResourceMetaDataDao}
import com.webank.wedatasphere.linkis.resourcemanager.domain.{EmMetaData, EmResourceMetaData, ModuleInfo, ModuleResourceRecord}
import com.webank.wedatasphere.linkis.resourcemanager.exception.RMErrorException
import com.webank.wedatasphere.linkis.resourcemanager.{Resource, ResourceRequestPolicy, ResourceSerializer}
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization.{read, write}
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import scala.collection.JavaConversions
/**
* Created by shanhuang on 9/11/18.
*/
@Component
class ModuleResourceRecordService extends Logging {
implicit val formats = DefaultFormats + ResourceSerializer
@Autowired
var emResourceMetaDataDao: EmResourceMetaDataDao = _
@Autowired
var emMetaDataDao: EmMetaDataDao = _
def putModulePolicy(moduleName: String, policy: ResourceRequestPolicy.ResourceRequestPolicy): Unit = {
var emMetaData = emMetaDataDao.getByEmName(moduleName)
if (emMetaData == null) {
emMetaData = new EmMetaData(moduleName, policy.toString)
emMetaDataDao.insert(emMetaData)
} else {
emMetaData.setResourceRequestPolicy(policy.toString)
emMetaDataDao.update(emMetaData)
}
}
def getModulePolicy(moduleName: String): ResourceRequestPolicy.ResourceRequestPolicy = {
val emMetaData = emMetaDataDao.getByEmName(moduleName)
if (emMetaData == null) throw new RMErrorException(110005, s"Module(模块): $moduleName Not registered in the resource manager(并没有在资源管理器进行注册)") else ResourceRequestPolicy.withName(emMetaData.getResourceRequestPolicy)
}
def getModuleName: Array[String] = JavaConversions.asScalaBuffer(emMetaDataDao.getAll).toArray.map(_.getEmName)
/**
* Obtain resource usage records for the same type of module by module name
* 通过模块名获得同一类模块的资源使用记录
*
* @param moduleName
* @return
*/
def getModuleResourceRecords(moduleName: String): Array[EmResourceMetaData] = {
val records = getByEmName(moduleName)
if (records.size() < 1) throw new RMErrorException(110005, s"Module(模块): $moduleName Not registered in the resource manager(并没有在资源管理器进行注册)")
JavaConversions.asScalaBuffer(records).toArray
}
def getModuleResourceRecord(moduleInstance: ServiceInstance): EmResourceMetaData = {
val m = getByEmInstance(moduleInstance.getApplicationName, moduleInstance.getInstance)
if (m == null)
throw new RMErrorException(110005, s"Module instance(模块实例): $moduleInstance Not registered in the resource manager(并没有在资源管理器进行注册)")
m
}
def putModuleRegisterRecord(moduleInfo: ModuleInfo): Unit = synchronized {
val existing = getByEmInstance(moduleInfo.moduleInstance.getApplicationName, moduleInfo.moduleInstance.getInstance)
if (existing != null)
throw new RMErrorException(110005, s"Module instance(模块实例): ${moduleInfo.moduleInstance} Already registered, if you need to re-register, please log out and then register(已经注册,如果需要重新注册请注销后再进行注册)")
ModuleResourceRecord(moduleInfo, Resource.initResource(moduleInfo.resourceRequestPolicy),
moduleInfo.totalResource, Resource.initResource(moduleInfo.resourceRequestPolicy))
val newRecord = new EmResourceMetaData(
moduleInfo.moduleInstance.getApplicationName,
moduleInfo.moduleInstance.getInstance,
serialize(moduleInfo.totalResource),
serialize(moduleInfo.protectedResource),
moduleInfo.resourceRequestPolicy.toString,
serialize(Resource.initResource(moduleInfo.resourceRequestPolicy)),
serialize(moduleInfo.totalResource),
serialize(Resource.initResource(moduleInfo.resourceRequestPolicy)),
System.currentTimeMillis()
)
insert(newRecord)
info(s"Succeed to register module ${moduleInfo.moduleInstance}")
}
def moduleLockedUserResource(moduleInstance: ServiceInstance, resource: Resource): Unit = synchronized {
val existing = getByEmInstance(moduleInstance.getApplicationName, moduleInstance.getInstance)
if (existing == null)
throw new RMErrorException(110005, s"Module instance(模块实例): $moduleInstance Not registered in the resource manager(并没有在资源管理器进行注册)")
existing.setLeftResource(serialize(deserialize(existing.getLeftResource) - resource))
existing.setLockedResource(serialize(deserialize(existing.getLockedResource) + resource))
update(existing)
}
def moduleUsedUserResource(moduleInstance: ServiceInstance, usedResource: Resource, lockedResource: Resource): Unit = synchronized {
val existing = getByEmInstance(moduleInstance.getApplicationName, moduleInstance.getInstance)
if (existing == null)
throw new RMErrorException(110005, s"Module instance(模块实例): $moduleInstance Not registered in the resource manager(并没有在资源管理器进行注册)")
existing.setUsedResource(serialize(deserialize(existing.getUsedResource) + usedResource))
existing.setLeftResource(serialize(deserialize(existing.getLeftResource) + lockedResource - usedResource))
existing.setLockedResource(serialize(deserialize(existing.getLockedResource) - lockedResource))
update(existing)
}
def moduleReleasedUserResource(moduleInstance: ServiceInstance, usedResource: Resource): Unit = synchronized {
val existing = getByEmInstance(moduleInstance.getApplicationName, moduleInstance.getInstance)
if (existing == null)
throw new RMErrorException(110005, s"Module instance(模块实例): $moduleInstance Not registered in the resource manager(并没有在资源管理器进行注册)")
existing.setUsedResource(serialize(deserialize(existing.getUsedResource) - usedResource))
existing.setLeftResource(serialize(deserialize(existing.getLeftResource) + usedResource))
update(existing)
}
def moduleClearLockedResource(moduleInstance: ServiceInstance, resource: Resource): Unit = synchronized {
val existing = getByEmInstance(moduleInstance.getApplicationName, moduleInstance.getInstance)
if (existing == null)
throw new RMErrorException(110005, s"Module instance(模块实例): $moduleInstance Not registered in the resource manager(并没有在资源管理器进行注册)")
existing.setLeftResource(serialize(deserialize(existing.getLeftResource) + resource))
existing.setLockedResource(serialize(deserialize(existing.getLockedResource) - resource))
update(existing)
}
def moduleClearUsedResource(moduleInstance: ServiceInstance, resource: Resource): Unit = synchronized {
val existing = getByEmInstance(moduleInstance.getApplicationName, moduleInstance.getInstance)
if (existing == null)
throw new RMErrorException(110005, s"Module instance(模块实例): $moduleInstance Not registered in the resource manager(并没有在资源管理器进行注册)")
existing.setUsedResource(serialize(deserialize(existing.getUsedResource) - resource))
existing.setLeftResource(serialize(deserialize(existing.getLeftResource) + resource))
}
def serialize(resource: Resource) = write(resource)
def deserialize(plainData: String) = read[Resource](plainData)
def getAll(): util.List[EmResourceMetaData] = emResourceMetaDataDao.getAll
def getByEmName(emApplicationName: String): util.List[EmResourceMetaData] = emResourceMetaDataDao.getByEmName(emApplicationName)
def getByEmInstance(emApplicationName: String, emInstance: String): EmResourceMetaData = emResourceMetaDataDao.getByEmInstance(emApplicationName, emInstance)
def insert(emResourceMetaData: EmResourceMetaData): Unit = {
emResourceMetaDataDao.insert(emResourceMetaData)
}
def update(emResourceMetaData: EmResourceMetaData): Unit = {
emResourceMetaDataDao.update(emResourceMetaData)
}
def delete(id: Integer) = {
emResourceMetaDataDao.deleteById(id)
}
}