blob: fc1b45b7473a383f51e6bd3696db89ba7cb45eb5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.utils;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Configuration;
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.conf.RestApiConstants;
import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
import org.apache.hadoop.yarn.service.provider.ProviderFactory;
import org.codehaus.jackson.map.PropertyNamingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class ServiceApiUtil {
private static final Logger LOG =
LoggerFactory.getLogger(ServiceApiUtil.class);
public static JsonSerDeser<Service> jsonSerDeser =
new JsonSerDeser<>(Service.class,
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
private static final PatternValidator namePattern
= new PatternValidator("[a-z][a-z0-9-]*");
private static final PatternValidator userNamePattern
= new PatternValidator("[a-z][a-z0-9-.]*");
@VisibleForTesting
public static void setJsonSerDeser(JsonSerDeser jsd) {
jsonSerDeser = jsd;
}
@VisibleForTesting
public static void validateAndResolveService(Service service,
SliderFileSystem fs, org.apache.hadoop.conf.Configuration conf) throws
IOException {
boolean dnsEnabled = conf.getBoolean(RegistryConstants.KEY_DNS_ENABLED,
RegistryConstants.DEFAULT_DNS_ENABLED);
if (dnsEnabled) {
if (RegistryUtils.currentUser().length()
> RegistryConstants.MAX_FQDN_LABEL_LENGTH) {
throw new IllegalArgumentException(
RestApiErrorMessages.ERROR_USER_NAME_INVALID);
}
userNamePattern.validate(RegistryUtils.currentUser());
}
if (StringUtils.isEmpty(service.getName())) {
throw new IllegalArgumentException(
RestApiErrorMessages.ERROR_APPLICATION_NAME_INVALID);
}
if (StringUtils.isEmpty(service.getVersion())) {
throw new IllegalArgumentException(String.format(
RestApiErrorMessages.ERROR_APPLICATION_VERSION_INVALID,
service.getName()));
}
validateNameFormat(service.getName(), conf);
// If the service has no components, throw error
if (!hasComponent(service)) {
throw new IllegalArgumentException(
"No component specified for " + service.getName());
}
if (UserGroupInformation.isSecurityEnabled()) {
if (!StringUtils.isEmpty(service.getKerberosPrincipal().getKeytab())) {
try {
// validate URI format
new URI(service.getKerberosPrincipal().getKeytab());
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
}
// Validate there are no component name collisions (collisions are not
// currently supported) and add any components from external services
Configuration globalConf = service.getConfiguration();
Set<String> componentNames = new HashSet<>();
List<Component> componentsToRemove = new ArrayList<>();
List<Component> componentsToAdd = new ArrayList<>();
for (Component comp : service.getComponents()) {
int maxCompLength = RegistryConstants.MAX_FQDN_LABEL_LENGTH;
maxCompLength = maxCompLength - Long.toString(Long.MAX_VALUE).length();
if (dnsEnabled && comp.getName().length() > maxCompLength) {
throw new IllegalArgumentException(String.format(RestApiErrorMessages
.ERROR_COMPONENT_NAME_INVALID, maxCompLength, comp.getName()));
}
if (componentNames.contains(comp.getName())) {
throw new IllegalArgumentException("Component name collision: " +
comp.getName());
}
// If artifact is of type SERVICE (which cannot be filled from
// global), read external service and add its components to this
// service
if (comp.getArtifact() != null && comp.getArtifact().getType() ==
Artifact.TypeEnum.SERVICE) {
if (StringUtils.isEmpty(comp.getArtifact().getId())) {
throw new IllegalArgumentException(
RestApiErrorMessages.ERROR_ARTIFACT_ID_INVALID);
}
LOG.info("Marking {} for removal", comp.getName());
componentsToRemove.add(comp);
List<Component> externalComponents = getComponents(fs,
comp.getArtifact().getId());
for (Component c : externalComponents) {
Component override = service.getComponent(c.getName());
if (override != null && override.getArtifact() == null) {
// allow properties from external components to be overridden /
// augmented by properties in this component, except for artifact
// which must be read from external component
override.mergeFrom(c);
LOG.info("Merging external component {} from external {}", c
.getName(), comp.getName());
} else {
if (componentNames.contains(c.getName())) {
throw new IllegalArgumentException("Component name collision: " +
c.getName());
}
componentNames.add(c.getName());
componentsToAdd.add(c);
LOG.info("Adding component {} from external {}", c.getName(),
comp.getName());
}
}
} else {
// otherwise handle as a normal component
componentNames.add(comp.getName());
// configuration
comp.getConfiguration().mergeFrom(globalConf);
}
}
service.getComponents().removeAll(componentsToRemove);
service.getComponents().addAll(componentsToAdd);
// Validate components and let global values take effect if component level
// values are not provided
Artifact globalArtifact = service.getArtifact();
Resource globalResource = service.getResource();
for (Component comp : service.getComponents()) {
// fill in global artifact unless it is type SERVICE
if (comp.getArtifact() == null && service.getArtifact() != null
&& service.getArtifact().getType() != Artifact.TypeEnum
.SERVICE) {
comp.setArtifact(globalArtifact);
}
// fill in global resource
if (comp.getResource() == null) {
comp.setResource(globalResource);
}
// validate dependency existence
if (comp.getDependencies() != null) {
for (String dependency : comp.getDependencies()) {
if (!componentNames.contains(dependency)) {
throw new IllegalArgumentException(String.format(
RestApiErrorMessages.ERROR_DEPENDENCY_INVALID, dependency,
comp.getName()));
}
}
}
validateComponent(comp, fs.getFileSystem(), conf);
}
validatePlacementPolicy(service.getComponents(), componentNames);
// validate dependency tree
sortByDependencies(service.getComponents());
// Service lifetime if not specified, is set to unlimited lifetime
if (service.getLifetime() == null) {
service.setLifetime(RestApiConstants.DEFAULT_UNLIMITED_LIFETIME);
}
}
private static void validateComponent(Component comp, FileSystem fs,
org.apache.hadoop.conf.Configuration conf)
throws IOException {
validateNameFormat(comp.getName(), conf);
AbstractClientProvider compClientProvider = ProviderFactory
.getClientProvider(comp.getArtifact());
compClientProvider.validateArtifact(comp.getArtifact(), fs);
if (comp.getLaunchCommand() == null && (comp.getArtifact() == null || comp
.getArtifact().getType() != Artifact.TypeEnum.DOCKER)) {
throw new IllegalArgumentException(RestApiErrorMessages
.ERROR_ABSENT_LAUNCH_COMMAND);
}
validateServiceResource(comp.getResource(), comp);
if (comp.getNumberOfContainers() == null
|| comp.getNumberOfContainers() < 0) {
throw new IllegalArgumentException(String.format(
RestApiErrorMessages.ERROR_CONTAINERS_COUNT_FOR_COMP_INVALID
+ ": " + comp.getNumberOfContainers(), comp.getName()));
}
compClientProvider.validateConfigFiles(comp.getConfiguration()
.getFiles(), fs);
MonitorUtils.getProbe(comp.getReadinessCheck());
}
// Check component or service name format and transform to lower case.
public static void validateNameFormat(String name,
org.apache.hadoop.conf.Configuration conf) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Name can not be empty!");
}
// validate component name
if (name.contains("_")) {
throw new IllegalArgumentException(
"Invalid format: " + name
+ ", can not use '_', as DNS hostname does not allow '_'. Use '-' Instead. ");
}
boolean dnsEnabled = conf.getBoolean(RegistryConstants.KEY_DNS_ENABLED,
RegistryConstants.DEFAULT_DNS_ENABLED);
if (dnsEnabled && name.length() > RegistryConstants.MAX_FQDN_LABEL_LENGTH) {
throw new IllegalArgumentException(String
.format("Invalid format %s, must be no more than 63 characters ",
name));
}
namePattern.validate(name);
}
private static void validatePlacementPolicy(List<Component> components,
Set<String> componentNames) {
for (Component comp : components) {
if (comp.getPlacementPolicy() != null) {
for (PlacementConstraint constraint : comp.getPlacementPolicy()
.getConstraints()) {
for (String targetTag : constraint.getTargetTags()) {
if (!comp.getName().equals(targetTag)) {
throw new IllegalArgumentException(String.format(
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME,
targetTag, comp.getName(), comp.getName(), comp.getName()));
}
}
}
}
}
}
@VisibleForTesting
public static List<Component> getComponents(SliderFileSystem
fs, String serviceName) throws IOException {
return loadService(fs, serviceName).getComponents();
}
public static Service loadService(SliderFileSystem fs, String
serviceName) throws IOException {
Path serviceJson = getServiceJsonPath(fs, serviceName);
LOG.info("Loading service definition from " + serviceJson);
return jsonSerDeser.load(fs.getFileSystem(), serviceJson);
}
public static Service loadServiceUpgrade(SliderFileSystem fs,
String serviceName, String version) throws IOException {
Path versionPath = fs.buildClusterUpgradeDirPath(serviceName, version);
Path versionedDef = new Path(versionPath, serviceName + ".json");
LOG.info("Loading service definition from {}", versionedDef);
return jsonSerDeser.load(fs.getFileSystem(), versionedDef);
}
public static Service loadServiceFrom(SliderFileSystem fs,
Path appDefPath) throws IOException {
LOG.info("Loading service definition from " + appDefPath);
return jsonSerDeser.load(fs.getFileSystem(), appDefPath);
}
public static Path getServiceJsonPath(SliderFileSystem fs, String serviceName) {
Path serviceDir = fs.buildClusterDirPath(serviceName);
return new Path(serviceDir, serviceName + ".json");
}
private static void validateServiceResource(Resource resource,
Component comp) {
// Only services/components of type SERVICE can skip resource requirement
if (resource == null) {
throw new IllegalArgumentException(
comp == null ? RestApiErrorMessages.ERROR_RESOURCE_INVALID : String
.format(RestApiErrorMessages.ERROR_RESOURCE_FOR_COMP_INVALID,
comp.getName()));
}
// One and only one of profile OR cpus & memory can be specified. Specifying
// both raises validation error.
if (StringUtils.isNotEmpty(resource.getProfile()) && (
resource.getCpus() != null || StringUtils
.isNotEmpty(resource.getMemory()))) {
throw new IllegalArgumentException(comp == null ?
RestApiErrorMessages.ERROR_RESOURCE_PROFILE_MULTIPLE_VALUES_NOT_SUPPORTED :
String.format(
RestApiErrorMessages.ERROR_RESOURCE_PROFILE_MULTIPLE_VALUES_FOR_COMP_NOT_SUPPORTED,
comp.getName()));
}
// Currently resource profile is not supported yet, so we will raise
// validation error if only resource profile is specified
if (StringUtils.isNotEmpty(resource.getProfile())) {
throw new IllegalArgumentException(
RestApiErrorMessages.ERROR_RESOURCE_PROFILE_NOT_SUPPORTED_YET);
}
String memory = resource.getMemory();
Integer cpus = resource.getCpus();
if (StringUtils.isEmpty(memory)) {
throw new IllegalArgumentException(
comp == null ? RestApiErrorMessages.ERROR_RESOURCE_MEMORY_INVALID :
String.format(
RestApiErrorMessages.ERROR_RESOURCE_MEMORY_FOR_COMP_INVALID,
comp.getName()));
}
if (cpus == null) {
throw new IllegalArgumentException(
comp == null ? RestApiErrorMessages.ERROR_RESOURCE_CPUS_INVALID :
String.format(
RestApiErrorMessages.ERROR_RESOURCE_CPUS_FOR_COMP_INVALID,
comp.getName()));
}
if (cpus <= 0) {
throw new IllegalArgumentException(comp == null ?
RestApiErrorMessages.ERROR_RESOURCE_CPUS_INVALID_RANGE : String
.format(
RestApiErrorMessages.ERROR_RESOURCE_CPUS_FOR_COMP_INVALID_RANGE,
comp.getName()));
}
}
// check if comp mem size exceeds cluster limit
public static void validateCompResourceSize(
org.apache.hadoop.yarn.api.records.Resource maxResource,
Service service) throws YarnException {
for (Component component : service.getComponents()) {
long mem = Long.parseLong(component.getResource().getMemory());
if (mem > maxResource.getMemorySize()) {
throw new YarnException(
"Component " + component.getName() + ": specified memory size ("
+ mem + ") is larger than configured max container memory " +
"size (" + maxResource.getMemorySize() + ")");
}
int cpu = component.getResource().getCpus();
if (cpu > maxResource.getVirtualCores()) {
throw new YarnException(
"Component " + component.getName() + ": specified number of " +
"virtual core (" + cpu + ") is larger than configured max " +
"virtual core size (" + maxResource.getVirtualCores() + ")");
}
}
}
private static boolean hasComponent(Service service) {
if (service.getComponents() == null || service.getComponents()
.isEmpty()) {
return false;
}
return true;
}
public static Collection<Component> sortByDependencies(List<Component>
components) {
Map<String, Component> sortedComponents =
sortByDependencies(components, null);
return sortedComponents.values();
}
/**
* Each internal call of sortByDependencies will identify all of the
* components with the same dependency depth (the lowest depth that has not
* been processed yet) and add them to the sortedComponents list, preserving
* their original ordering in the components list.
*
* So the first time it is called, all components with no dependencies
* (depth 0) will be identified. The next time it is called, all components
* that have dependencies only on the the depth 0 components will be
* identified (depth 1). This will be repeated until all components have
* been added to the sortedComponents list. If no new components are
* identified but the sortedComponents list is not complete, an error is
* thrown.
*/
private static Map<String, Component> sortByDependencies(List<Component>
components, Map<String, Component> sortedComponents) {
if (sortedComponents == null) {
sortedComponents = new LinkedHashMap<>();
}
Map<String, Component> componentsToAdd = new LinkedHashMap<>();
List<Component> componentsSkipped = new ArrayList<>();
for (Component component : components) {
String name = component.getName();
if (sortedComponents.containsKey(name)) {
continue;
}
boolean dependenciesAlreadySorted = true;
if (!ServiceUtils.isEmpty(component.getDependencies())) {
for (String dependency : component.getDependencies()) {
if (!sortedComponents.containsKey(dependency)) {
dependenciesAlreadySorted = false;
break;
}
}
}
if (dependenciesAlreadySorted) {
componentsToAdd.put(name, component);
} else {
componentsSkipped.add(component);
}
}
if (componentsToAdd.size() == 0) {
throw new IllegalArgumentException(String.format(RestApiErrorMessages
.ERROR_DEPENDENCY_CYCLE, componentsSkipped));
}
sortedComponents.putAll(componentsToAdd);
if (sortedComponents.size() == components.size()) {
return sortedComponents;
}
return sortByDependencies(components, sortedComponents);
}
public static void createDirAndPersistApp(SliderFileSystem fs, Path appDir,
Service service)
throws IOException, SliderException {
FsPermission appDirPermission = new FsPermission("750");
fs.createWithPermissions(appDir, appDirPermission);
Path appJson = writeAppDefinition(fs, appDir, service);
LOG.info("Persisted service {} version {} at {}", service.getName(),
service.getVersion(), appJson);
}
public static Path writeAppDefinition(SliderFileSystem fs, Path appDir,
Service service) throws IOException {
Path appJson = new Path(appDir, service.getName() + ".json");
jsonSerDeser.save(fs.getFileSystem(), appJson, service, true);
return appJson;
}
public static String $(String s) {
return "${" + s +"}";
}
}