blob: 123aa5995bcbecdd534a229f315229028b7a7ecb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.linkis.manager.rm.utils;
import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.governance.common.protocol.conf.RequestQueryEngineConfigWithGlobalConfig;
import org.apache.linkis.governance.common.protocol.conf.RequestQueryGlobalConfig;
import org.apache.linkis.governance.common.protocol.conf.ResponseQueryConfig;
import org.apache.linkis.manager.common.conf.RMConfiguration;
import org.apache.linkis.manager.common.entity.resource.*;
import org.apache.linkis.manager.common.errorcode.ManagerCommonErrorCodeSummary;
import org.apache.linkis.manager.common.exception.RMWarnException;
import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactory;
import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
import org.apache.linkis.manager.label.utils.EngineTypeLabelCreator;
import org.apache.linkis.manager.label.utils.LabelUtils;
import org.apache.linkis.protocol.CacheableProtocol;
import org.apache.linkis.rpc.RPCMapCache;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UserConfiguration {
private static final Logger logger = LoggerFactory.getLogger(UserConfiguration.class);
private static final LabelBuilderFactory labelFactory =
LabelBuilderFactoryContext.getLabelBuilderFactory();
private static final RPCMapCache<String, String, String> globalMapCache =
new RPCMapCache<String, String, String>(
Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME().getValue()) {
@Override
public CacheableProtocol createRequest(String user) {
return new RequestQueryGlobalConfig(user);
}
@Override
public Map<String, String> createMap(Object any) {
ResponseQueryConfig response = (ResponseQueryConfig) any;
return response.getKeyAndValue();
}
};
private static final RPCMapCache<Map<UserCreatorLabel, EngineTypeLabel>, String, String>
engineMapCache =
new RPCMapCache<Map<UserCreatorLabel, EngineTypeLabel>, String, String>(
Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME().getValue()) {
@Override
public CacheableProtocol createRequest(
Map<UserCreatorLabel, EngineTypeLabel> labelTuple) {
UserCreatorLabel userCreatorLabel = null;
EngineTypeLabel engineTypeLabel = null;
for (Map.Entry<UserCreatorLabel, EngineTypeLabel> entry : labelTuple.entrySet()) {
userCreatorLabel = entry.getKey();
engineTypeLabel = entry.getValue();
}
return new RequestQueryEngineConfigWithGlobalConfig(
userCreatorLabel, engineTypeLabel, null);
}
@Override
public Map<String, String> createMap(Object any) {
ResponseQueryConfig response = (ResponseQueryConfig) any;
return response.getKeyAndValue();
}
};
public static Map<String, String> getGlobalConfig(String user) {
return globalMapCache.getCacheMap(user);
}
public static int getUserGlobalInstanceLimit(String user) {
Map<String, String> globalConfig = getGlobalConfig(user);
return (int) RMConfiguration.USER_AVAILABLE_INSTANCE.getValue(globalConfig);
}
private static Map<UserCreatorLabel, EngineTypeLabel> buildRequestLabel(
String user, String creator, String engineType) {
UserCreatorLabel userCreateLabel = labelFactory.createLabel(UserCreatorLabel.class);
userCreateLabel.setUser(user);
userCreateLabel.setCreator(LabelUtils.COMMON_VALUE);
EngineTypeLabel engineTypeLabel =
EngineTypeLabelCreator.createEngineTypeLabel(LabelUtils.COMMON_VALUE);
if (engineType != null) {
engineTypeLabel.setEngineType(engineType);
}
if (creator != null) {
userCreateLabel.setCreator(creator);
}
Map<UserCreatorLabel, EngineTypeLabel> labelTuple =
new HashMap<UserCreatorLabel, EngineTypeLabel>();
labelTuple.put(userCreateLabel, engineTypeLabel);
return labelTuple;
}
public static Resource getUserConfiguredResource(
ResourceType resourceType,
UserCreatorLabel userCreatorLabel,
EngineTypeLabel engineTypeLabel) {
try {
Map<UserCreatorLabel, EngineTypeLabel> labelTuple =
new HashMap<UserCreatorLabel, EngineTypeLabel>();
labelTuple.put(userCreatorLabel, engineTypeLabel);
Resource userCreatorAvailableResource =
generateResource(resourceType, engineMapCache.getCacheMap(labelTuple));
logger.info(
userCreatorLabel.getUser()
+ "on creator "
+ userCreatorLabel.getCreator()
+ " available engine "
+ engineTypeLabel.getEngineType()
+ " resource:"
+ userCreatorAvailableResource);
return userCreatorAvailableResource;
} catch (RMWarnException e) {
logger.error("Failed to get user configured resource due to: " + e.getMessage(), e);
throw e;
}
}
public Resource getUserConfiguredResource(
ResourceType resourceType, String engineType, String user, String creator) {
try {
Map<String, String> userConfiguration =
engineMapCache.getCacheMap(buildRequestLabel(user, creator, engineType));
Resource userCreatorAvailableResource = generateResource(resourceType, userConfiguration);
logger.info(user + " on creator available resource:" + userCreatorAvailableResource);
return userCreatorAvailableResource;
} catch (Exception e) {
logger.warn(e.getMessage());
throw e;
}
}
public static Resource generateResource(
ResourceType policy, Map<String, String> userConfiguration) {
switch (policy) {
case CPU:
return new CPUResource(RMConfiguration.USER_AVAILABLE_CPU.getValue(userConfiguration));
case Memory:
return new MemoryResource(
RMConfiguration.USER_AVAILABLE_MEMORY.getValue(userConfiguration).toLong());
case Load:
return new LoadResource(
RMConfiguration.USER_AVAILABLE_MEMORY.getValue(userConfiguration).toLong(),
RMConfiguration.USER_AVAILABLE_CPU.getValue(userConfiguration));
case Instance:
return new InstanceResource(
RMConfiguration.USER_AVAILABLE_INSTANCE.getValue(userConfiguration));
case LoadInstance:
return new LoadInstanceResource(
RMConfiguration.USER_AVAILABLE_MEMORY.getValue(userConfiguration).toLong(),
RMConfiguration.USER_AVAILABLE_CPU.getValue(userConfiguration),
RMConfiguration.USER_AVAILABLE_INSTANCE.getValue(userConfiguration));
case Yarn:
return new YarnResource(
RMConfiguration.USER_AVAILABLE_YARN_INSTANCE_MEMORY
.getValue(userConfiguration)
.toLong(),
RMConfiguration.USER_AVAILABLE_YARN_INSTANCE_CPU.getValue(userConfiguration),
RMConfiguration.USER_AVAILABLE_YARN_INSTANCE.getValue(userConfiguration),
RMConfiguration.USER_AVAILABLE_YARN_QUEUE_NAME.getValue(userConfiguration));
case DriverAndYarn:
LoadInstanceResource loadInstanceResource =
new LoadInstanceResource(
RMConfiguration.USER_AVAILABLE_MEMORY.getValue(userConfiguration).toLong(),
RMConfiguration.USER_AVAILABLE_CPU.getValue(userConfiguration),
RMConfiguration.USER_AVAILABLE_INSTANCE.getValue(userConfiguration));
YarnResource yarnResource =
new YarnResource(
RMConfiguration.USER_AVAILABLE_YARN_INSTANCE_MEMORY
.getValue(userConfiguration)
.toLong(),
RMConfiguration.USER_AVAILABLE_YARN_INSTANCE_CPU.getValue(userConfiguration),
RMConfiguration.USER_AVAILABLE_YARN_INSTANCE.getValue(userConfiguration),
RMConfiguration.USER_AVAILABLE_YARN_QUEUE_NAME.getValue(userConfiguration));
return new DriverAndYarnResource(loadInstanceResource, yarnResource);
case Kubernetes:
return new KubernetesResource(
RMConfiguration.USER_AVAILABLE_KUBERNETES_INSTANCE_MEMORY
.getValue(userConfiguration)
.toLong(),
RMConfiguration.USER_AVAILABLE_KUBERNETES_INSTANCE_CPU.getValue(userConfiguration),
RMConfiguration.USER_AVAILABLE_KUBERNETES_INSTANCE_NAMESPACE.getValue(
userConfiguration));
case DriverAndKubernetes:
return new DriverAndKubernetesResource(
new LoadInstanceResource(
RMConfiguration.USER_AVAILABLE_MEMORY.getValue(userConfiguration).toLong(),
RMConfiguration.USER_AVAILABLE_CPU.getValue(userConfiguration),
RMConfiguration.USER_AVAILABLE_INSTANCE.getValue(userConfiguration)),
new KubernetesResource(
RMConfiguration.USER_AVAILABLE_KUBERNETES_INSTANCE_MEMORY
.getValue(userConfiguration)
.toLong(),
RMConfiguration.USER_AVAILABLE_KUBERNETES_INSTANCE_CPU.getValue(
userConfiguration)));
case Special:
return new SpecialResource(new HashMap<String, Object>());
default:
throw new RMWarnException(
ManagerCommonErrorCodeSummary.NOT_RESOURCE_RESULT_TYPE.getErrorCode(),
ManagerCommonErrorCodeSummary.NOT_RESOURCE_RESULT_TYPE.getErrorDesc());
}
}
}