blob: d65a1969a1309c61f3a074195014246a9e97b31c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.provider;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
import org.apache.hadoop.yarn.service.api.records.ConfigFormat;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
import org.apache.hadoop.yarn.service.exceptions.BadCommandArgumentsException;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.utils.PublishedConfiguration;
import org.apache.hadoop.yarn.service.utils.PublishedConfigurationOutputter;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
/**
* This is a factoring out of methods handy for providers. It's bonded to a log
* at construction time.
*/
public class ProviderUtils implements YarnServiceConstants {
protected static final Logger log =
LoggerFactory.getLogger(ProviderUtils.class);
/**
* Add oneself to the classpath. This does not work
* on minicluster test runs where the JAR is not built up.
* @param providerResources map of provider resources to add these entries to
* @param providerClass provider to add
* @param jarName name of the jar to use
* @param sliderFileSystem target filesystem
* @param tempPath path in the cluster FS for temp files
* @param libdir relative directory to place resources
* @param miniClusterTestRun true if minicluster is being used
* @return true if the class was found in a JAR
*
* @throws FileNotFoundException if the JAR was not found and this is NOT
* a mini cluster test run
* @throws IOException IO problems
* @throws SliderException any Slider problem
*/
public static boolean addProviderJar(
Map<String, LocalResource> providerResources,
Class providerClass,
String jarName,
SliderFileSystem sliderFileSystem,
Path tempPath,
String libdir,
boolean miniClusterTestRun) throws
IOException,
SliderException {
try {
ServiceUtils.putJar(providerResources,
sliderFileSystem,
providerClass,
tempPath,
libdir,
jarName);
return true;
} catch (FileNotFoundException e) {
if (miniClusterTestRun) {
return false;
} else {
throw e;
}
}
}
/**
* Loads all dependency jars from the default path.
* @param providerResources map of provider resources to add these entries to
* @param sliderFileSystem target filesystem
* @param tempPath path in the cluster FS for temp files
* @param libDir relative directory to place resources
* @param libLocalSrcDir explicitly supplied local libs dir
* @throws IOException trouble copying to HDFS
* @throws SliderException trouble copying to HDFS
*/
public static void addAllDependencyJars(
Map<String, LocalResource> providerResources,
SliderFileSystem sliderFileSystem,
Path tempPath,
String libDir,
String libLocalSrcDir)
throws IOException, SliderException {
if (ServiceUtils.isSet(libLocalSrcDir)) {
File file = new File(libLocalSrcDir);
if (!file.exists() || !file.isDirectory()) {
throw new BadCommandArgumentsException(
"Supplied lib src dir %s is not valid", libLocalSrcDir);
}
}
ServiceUtils.putAllJars(providerResources, sliderFileSystem, tempPath,
libDir, libLocalSrcDir);
}
public static String substituteStrWithTokens(String content,
Map<String, String> tokensForSubstitution) {
for (Map.Entry<String, String> token : tokensForSubstitution.entrySet()) {
content =
content.replaceAll(Pattern.quote(token.getKey()), token.getValue());
}
return content;
}
// configs will be substituted by corresponding env in tokenMap
public static void substituteMapWithTokens(Map<String, String> configs,
Map<String, String> tokenMap) {
for (Map.Entry<String, String> entry : configs.entrySet()) {
String value = entry.getValue();
if (tokenMap != null) {
for (Map.Entry<String, String> token : tokenMap.entrySet()) {
value =
value.replaceAll(Pattern.quote(token.getKey()), token.getValue());
}
}
entry.setValue(value);
}
}
public static Path initCompInstanceDir(SliderFileSystem fs,
ComponentInstance instance) {
Path compDir = new Path(new Path(fs.getAppDir(), "components"),
instance.getCompName());
Path compInstanceDir = new Path(compDir, instance.getCompInstanceName());
instance.setCompInstanceDir(compInstanceDir);
return compInstanceDir;
}
// 1. Create all config files for a component on hdfs for localization
// 2. Add the config file to localResource
public static synchronized void createConfigFileAndAddLocalResource(
AbstractLauncher launcher, SliderFileSystem fs, Component component,
Map<String, String> tokensForSubstitution, ComponentInstance instance,
ServiceContext context) throws IOException {
Path compInstanceDir = initCompInstanceDir(fs, instance);
if (!fs.getFileSystem().exists(compInstanceDir)) {
log.info(instance.getCompInstanceId() + ": Creating dir on hdfs: " + compInstanceDir);
fs.getFileSystem().mkdirs(compInstanceDir,
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
} else {
log.info("Component instance conf dir already exists: " + compInstanceDir);
}
if (log.isDebugEnabled()) {
log.debug("Tokens substitution for component instance: " + instance
.getCompInstanceName() + System.lineSeparator()
+ tokensForSubstitution);
}
for (ConfigFile originalFile : component.getConfiguration().getFiles()) {
ConfigFile configFile = originalFile.copy();
String fileName = new Path(configFile.getDestFile()).getName();
// substitute file name
for (Map.Entry<String, String> token : tokensForSubstitution.entrySet()) {
configFile.setDestFile(configFile.getDestFile()
.replaceAll(Pattern.quote(token.getKey()), token.getValue()));
}
Path remoteFile = new Path(compInstanceDir, fileName);
if (!fs.getFileSystem().exists(remoteFile)) {
log.info("Saving config file on hdfs for component " + instance
.getCompInstanceName() + ": " + configFile);
if (configFile.getSrcFile() != null) {
// Load config file template
switch (configFile.getType()) {
case HADOOP_XML:
// Hadoop_xml_template
resolveHadoopXmlTemplateAndSaveOnHdfs(fs.getFileSystem(),
tokensForSubstitution, configFile, remoteFile, context);
break;
case TEMPLATE:
// plain-template
resolvePlainTemplateAndSaveOnHdfs(fs.getFileSystem(),
tokensForSubstitution, configFile, remoteFile, context);
break;
default:
log.info("Not supporting loading src_file for " + configFile);
break;
}
} else {
// If src_file is not specified
resolvePropsInConfigFileAndSaveOnHdfs(fs, tokensForSubstitution,
instance, configFile, fileName, remoteFile);
}
}
// Add resource for localization
LocalResource configResource =
fs.createAmResource(remoteFile, LocalResourceType.FILE);
File destFile = new File(configFile.getDestFile());
String symlink = APP_CONF_DIR + "/" + fileName;
if (destFile.isAbsolute()) {
launcher.addLocalResource(symlink, configResource,
configFile.getDestFile());
log.info("Add config file for localization: " + symlink + " -> "
+ configResource.getResource().getFile() + ", dest mount path: "
+ configFile.getDestFile());
} else {
launcher.addLocalResource(symlink, configResource);
log.info("Add config file for localization: " + symlink + " -> "
+ configResource.getResource().getFile());
}
}
}
private static void resolvePropsInConfigFileAndSaveOnHdfs(SliderFileSystem fs,
Map<String, String> tokensForSubstitution, ComponentInstance instance,
ConfigFile configFile, String fileName, Path remoteFile)
throws IOException {
// substitute non-template configs
substituteMapWithTokens(configFile.getProperties(), tokensForSubstitution);
// write configs onto hdfs
PublishedConfiguration publishedConfiguration =
new PublishedConfiguration(fileName,
configFile.getProperties().entrySet());
if (!fs.getFileSystem().exists(remoteFile)) {
PublishedConfigurationOutputter configurationOutputter =
PublishedConfigurationOutputter.createOutputter(
ConfigFormat.resolve(configFile.getType().toString()),
publishedConfiguration);
try (FSDataOutputStream os = fs.getFileSystem().create(remoteFile)) {
configurationOutputter.save(os);
os.flush();
}
} else {
log.info("Component instance = " + instance.getCompInstanceName()
+ ", config file already exists: " + remoteFile);
}
}
// 1. substitute config template - only handle hadoop_xml format
// 2. save on hdfs
@SuppressWarnings("unchecked")
private static void resolveHadoopXmlTemplateAndSaveOnHdfs(FileSystem fs,
Map<String, String> tokensForSubstitution, ConfigFile configFile,
Path remoteFile, ServiceContext context) throws IOException {
Map<String, String> conf;
try {
conf = (Map<String, String>) context.configCache.get(configFile);
} catch (ExecutionException e) {
log.info("Failed to load config file: " + configFile, e);
return;
}
// make a copy for substitution
org.apache.hadoop.conf.Configuration confCopy =
new org.apache.hadoop.conf.Configuration(false);
for (Map.Entry<String, String> entry : conf.entrySet()) {
confCopy.set(entry.getKey(), entry.getValue());
}
// substitute properties
for (Map.Entry<String, String> entry : configFile.getProperties().entrySet()) {
confCopy.set(entry.getKey(), entry.getValue());
}
// substitute env variables
for (Map.Entry<String, String> entry : confCopy) {
String val = entry.getValue();
if (val != null) {
for (Map.Entry<String, String> token : tokensForSubstitution
.entrySet()) {
val = val.replaceAll(Pattern.quote(token.getKey()), token.getValue());
confCopy.set(entry.getKey(), val);
}
}
}
// save on hdfs
try (OutputStream output = fs.create(remoteFile)) {
confCopy.writeXml(output);
log.info("Reading config from: " + configFile.getSrcFile()
+ ", writing to: " + remoteFile);
}
}
// 1) read the template as a string
// 2) do token substitution
// 3) save on hdfs
private static void resolvePlainTemplateAndSaveOnHdfs(FileSystem fs,
Map<String, String> tokensForSubstitution, ConfigFile configFile,
Path remoteFile, ServiceContext context) {
String content;
try {
content = (String) context.configCache.get(configFile);
} catch (ExecutionException e) {
log.info("Failed to load config file: " + configFile, e);
return;
}
// substitute tokens
content = substituteStrWithTokens(content, tokensForSubstitution);
try (OutputStream output = fs.create(remoteFile)) {
org.apache.commons.io.IOUtils.write(content, output);
} catch (IOException e) {
log.info("Failed to create " + remoteFile);
}
}
/**
* Get initial component token map to be substituted into config values.
* @return tokens to replace
*/
public static Map<String, String> initCompTokensForSubstitute(
ComponentInstance instance, Container container) {
Map<String, String> tokens = new HashMap<>();
tokens.put(COMPONENT_NAME, instance.getCompSpec().getName());
tokens
.put(COMPONENT_NAME_LC, instance.getCompSpec().getName().toLowerCase());
tokens.put(COMPONENT_INSTANCE_NAME, instance.getCompInstanceName());
tokens.put(CONTAINER_ID, container.getId().toString());
tokens.put(COMPONENT_ID,
String.valueOf(instance.getCompInstanceId().getId()));
tokens.putAll(instance.getComponent().getDependencyHostIpTokens());
return tokens;
}
}