blob: de98472a3dab9146dd956cc91433798ac1863de6 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.resourcemanager.service.metadata
import java.util
import java.util.Map.Entry
import com.webank.wedatasphere.linkis.common.utils.Logging
import com.webank.wedatasphere.linkis.protocol.config.{RequestQueryAppConfigWithGlobal, ResponseQueryConfig}
import com.webank.wedatasphere.linkis.protocol.utils.ProtocolUtils
import com.webank.wedatasphere.linkis.resourcemanager.ResourceRequestPolicy._
import com.webank.wedatasphere.linkis.resourcemanager._
import com.webank.wedatasphere.linkis.resourcemanager.exception.RMWarnException
import com.webank.wedatasphere.linkis.resourcemanager.utils.RMConfiguration
import com.webank.wedatasphere.linkis.resourcemanager.utils.RMConfiguration._
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import scala.collection.mutable
/**
* Created by shanhuang on 9/11/18.
*/
@Component
class DefaultUserMetaData extends UserMetaData with Logging {
@Autowired
var moduleResourceRecordService: ModuleResourceRecordService = _
override def getUserAvailableResource(moduleName: String, user: String, creator: String): (UserAvailableResource, UserAvailableResource) = {
val policy = moduleResourceRecordService.getModulePolicy(moduleName)
val appName = ProtocolUtils.getAppName(moduleName).getOrElse(moduleName)
val userModuleAvailableResource = UserAvailableResource(moduleName, generateResource(policy, UserConfiguration.getCacheMap(RequestQueryAppConfigWithGlobal(user, null, appName, true))))
val userCreatorAvailableResource = UserAvailableResource(moduleName, generateResource(policy, UserConfiguration.getCacheMap(RequestQueryAppConfigWithGlobal(user, creator, appName, true))))
info(s"$user available resource of module:$userModuleAvailableResource,on creator available resource:$userCreatorAvailableResource")
(userModuleAvailableResource, userCreatorAvailableResource)
}
override def getUserGlobalInstanceLimit(user: String): Int = {
val userConfiguration = UserConfiguration.getCacheMap(RequestQueryAppConfigWithGlobal(user, null, null, true))
USER_AVAILABLE_INSTANCE.getValue(userConfiguration)
}
def generateResource(policy: ResourceRequestPolicy, userConfiguration: util.Map[String, String]): Resource = policy match {
case CPU => new CPUResource(USER_AVAILABLE_CPU.getValue(userConfiguration))
case Memory => new MemoryResource(USER_AVAILABLE_MEMORY.getValue(userConfiguration).toLong)
case Load => new LoadResource(USER_AVAILABLE_MEMORY.getValue(userConfiguration).toLong, USER_AVAILABLE_CPU.getValue(userConfiguration))
case Instance => new InstanceResource(USER_AVAILABLE_INSTANCE.getValue(userConfiguration))
case LoadInstance => new LoadInstanceResource(USER_AVAILABLE_MEMORY.getValue(userConfiguration).toLong, USER_AVAILABLE_CPU.getValue(userConfiguration), USER_AVAILABLE_INSTANCE.getValue(userConfiguration))
case Yarn => new YarnResource(USER_AVAILABLE_YARN_INSTANCE_MEMORY.getValue(userConfiguration).toLong,
USER_AVAILABLE_YARN_INSTANCE_CPU.getValue(userConfiguration),
USER_AVAILABLE_YARN_INSTANCE.getValue(userConfiguration), USER_AVAILABLE_YARN_QUEUE_NAME.getValue(userConfiguration))
case DriverAndYarn => new DriverAndYarnResource(new LoadInstanceResource(USER_AVAILABLE_MEMORY.getValue(userConfiguration).toLong, USER_AVAILABLE_CPU.getValue(userConfiguration), USER_AVAILABLE_INSTANCE.getValue(userConfiguration)),
new YarnResource(USER_AVAILABLE_YARN_INSTANCE_MEMORY.getValue(userConfiguration).toLong,
USER_AVAILABLE_YARN_INSTANCE_CPU.getValue(userConfiguration),
USER_AVAILABLE_YARN_INSTANCE.getValue(userConfiguration), USER_AVAILABLE_YARN_QUEUE_NAME.getValue(userConfiguration)))
case Special => new SpecialResource(new java.util.HashMap[String, AnyVal]())
case _ => throw new RMWarnException(111003, "not supported resource result policy ")
}
override def getUserModuleInfo(moduleName: String, user: String): Map[String, Any] = {
val appName = ProtocolUtils.getAppName(moduleName).getOrElse(moduleName)
val userConfiguration = UserConfiguration.getCacheMap(RequestQueryAppConfigWithGlobal(user, null, appName, true))
val userModuleInfo = new mutable.HashMap[String, Any]()
userModuleInfo.put("waitUsed", USER_MODULE_WAIT_USED.getValue(userConfiguration))
userModuleInfo.put("waitReleased", USER_MODULE_WAIT_RELEASE.getValue(userConfiguration))
userModuleInfo.toMap
}
}