| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.oozie.service; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Strings; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapred.Master; |
| import org.apache.hadoop.mapred.JobClient; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.mapreduce.counters.Limits; |
| import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol; |
| import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; |
| import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; |
| import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.TokenIdentifier; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.LocalResourceType; |
| import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; |
| import org.apache.hadoop.yarn.client.ClientRMProxy; |
| import org.apache.hadoop.yarn.client.api.YarnClient; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.ipc.YarnRPC; |
| import org.apache.hadoop.yarn.util.ConverterUtils; |
| import org.apache.hadoop.yarn.util.Records; |
| import org.apache.oozie.ErrorCode; |
| import org.apache.oozie.action.ActionExecutorException; |
| import org.apache.oozie.action.hadoop.JavaActionExecutor; |
| import org.apache.oozie.util.IOUtils; |
| import org.apache.oozie.util.ParamChecker; |
| import org.apache.oozie.util.XConfiguration; |
| import org.apache.oozie.util.XLog; |
| import org.apache.oozie.util.JobUtils; |
| import org.apache.oozie.workflow.lite.LiteWorkflowAppParser; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FilenameFilter; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.lang.reflect.Method; |
| import java.net.InetAddress; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.security.PrivilegedAction; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.Arrays; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.HashSet; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| |
| /** |
| * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p> The |
| * default accessor used is the base accessor which just injects the UGI into the configuration instance used to |
| * create/obtain JobClient and FileSystem instances. |
| */ |
| public class HadoopAccessorService implements Service { |
| |
| private static XLog LOG = XLog.getLog(HadoopAccessorService.class); |
| |
| public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService."; |
| public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist"; |
| public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist"; |
| public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations"; |
| public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations"; |
| public static final String ACTION_CONFS_LOAD_DEFAULT_RESOURCES = ACTION_CONFS + ".load.default.resources"; |
| public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled"; |
| public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file"; |
| public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal"; |
| protected static final String FS_PROP_PATTERN = CONF_PREFIX + "fs.%s"; |
| |
| private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created"; |
| private static final String DEFAULT_ACTIONNAME = "default"; |
| private static Configuration cachedConf; |
| |
| private Set<String> jobTrackerWhitelist = new HashSet<String>(); |
| private Set<String> nameNodeWhitelist = new HashSet<String>(); |
| private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>(); |
| private Map<String, File> actionConfigDirs = new HashMap<String, File>(); |
| private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>(); |
| |
| private UserGroupInformationService ugiService; |
| |
| /** |
| * Supported filesystem schemes for namespace federation |
| */ |
| public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems"; |
| private Set<String> supportedSchemes; |
| private boolean allSchemesSupported; |
| |
| public void init(Services services) throws ServiceException { |
| this.ugiService = services.get(UserGroupInformationService.class); |
| init(services.getConf()); |
| } |
| |
| //for testing purposes, see XFsTestCase |
| public void init(Configuration conf) throws ServiceException { |
| for (String name : ConfigurationService.getStrings(conf, JOB_TRACKER_WHITELIST)) { |
| String tmp = name.toLowerCase().trim(); |
| if (tmp.length() == 0) { |
| continue; |
| } |
| jobTrackerWhitelist.add(tmp); |
| } |
| LOG.info( |
| "JOB_TRACKER_WHITELIST :" + jobTrackerWhitelist.toString() |
| + ", Total entries :" + jobTrackerWhitelist.size()); |
| for (String name : ConfigurationService.getStrings(conf, NAME_NODE_WHITELIST)) { |
| String tmp = name.toLowerCase().trim(); |
| if (tmp.length() == 0) { |
| continue; |
| } |
| nameNodeWhitelist.add(tmp); |
| } |
| LOG.info( |
| "NAME_NODE_WHITELIST :" + nameNodeWhitelist.toString() |
| + ", Total entries :" + nameNodeWhitelist.size()); |
| |
| boolean kerberosAuthOn = ConfigurationService.getBoolean(conf, KERBEROS_AUTH_ENABLED); |
| LOG.info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled"); |
| if (kerberosAuthOn) { |
| kerberosInit(conf); |
| } |
| else { |
| Configuration ugiConf = new Configuration(); |
| ugiConf.set("hadoop.security.authentication", "simple"); |
| UserGroupInformation.setConfiguration(ugiConf); |
| } |
| |
| if (ugiService == null) { //for testing purposes, see XFsTestCase |
| this.ugiService = new UserGroupInformationService(); |
| } |
| |
| loadHadoopConfigs(conf); |
| preLoadActionConfigs(conf); |
| |
| supportedSchemes = new HashSet<String>(); |
| String[] schemesFromConf = ConfigurationService.getStrings(conf, SUPPORTED_FILESYSTEMS); |
| if(schemesFromConf != null) { |
| for (String scheme: schemesFromConf) { |
| scheme = scheme.trim(); |
| // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed |
| if(scheme.equals("*")) { |
| if(schemesFromConf.length > 1) { |
| throw new ServiceException(ErrorCode.E0100, getClass().getName(), |
| SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both"); |
| } |
| allSchemesSupported = true; |
| } |
| supportedSchemes.add(scheme); |
| } |
| } |
| |
| setConfigForHadoopSecurityUtil(conf); |
| initializeMRLimits(conf); |
| } |
| |
| /** |
| * This method initializes the MapReduce's Limits class so the user can define more than 120 counters |
| * in their map reduce action. For more details please see OOZIE-3578. |
| * |
| * @param conf the loaded hadoop configurations including mapred-site.xml |
| */ |
| private void initializeMRLimits(Configuration conf) { |
| Limits.init(conf); |
| } |
| |
| private void setConfigForHadoopSecurityUtil(Configuration conf) { |
| // Prior to HADOOP-12954 (2.9.0+), Hadoop sets hadoop.security.token.service.use_ip on startup in a static block with no |
| // way for Oozie to change it because Oozie doesn't load *-site.xml files on the classpath. HADOOP-12954 added a way to |
| // set this property via a setConfiguration method. Ideally, this would be part of JobClient so Oozie wouldn't have to |
| // worry about it and we could have different values for different clusters, but we can't; so we have to use the same value |
| // for every cluster Oozie is configured for. To that end, we'll use the default NN's configs. If that's not defined, |
| // we'll use the wildcard's configs. And if that's not defined, we'll use an arbitrary cluster's configs. In any case, |
| // if the version of Hadoop we're using doesn't include HADOOP-12954, we'll do nothing (there's no workaround), and |
| // hadoop.security.token.service.use_ip will have the default value. |
| String nameNode = conf.get(LiteWorkflowAppParser.DEFAULT_NAME_NODE); |
| if (nameNode != null) { |
| nameNode = nameNode.trim(); |
| if (nameNode.isEmpty()) { |
| nameNode = null; |
| } |
| } |
| if (nameNode == null && hadoopConfigs.containsKey("*")) { |
| nameNode = "*"; |
| } |
| if (nameNode == null) { |
| for (String nn : hadoopConfigs.keySet()) { |
| nn = nn.trim(); |
| if (!nn.isEmpty()) { |
| nameNode = nn; |
| break; |
| } |
| } |
| } |
| if (nameNode != null) { |
| Configuration hConf = getConfiguration(nameNode); |
| try { |
| Method setConfigurationMethod = SecurityUtil.class.getMethod("setConfiguration", Configuration.class); |
| setConfigurationMethod.invoke(null, hConf); |
| LOG.debug("Setting Hadoop SecurityUtil Configuration to that of {0}", nameNode); |
| } catch (NoSuchMethodException e) { |
| LOG.debug("Not setting Hadoop SecurityUtil Configuration because this version of Hadoop doesn't support it"); |
| } catch (Exception e) { |
| LOG.error("An Exception occurred while trying to call setConfiguration on {0} via Reflection. It won't be called.", |
| SecurityUtil.class.getName(), e); |
| } |
| } |
| } |
| |
| private void kerberosInit(Configuration serviceConf) throws ServiceException { |
| try { |
| String keytabFile = ConfigurationService.get(serviceConf, KERBEROS_KEYTAB).trim(); |
| if (keytabFile.length() == 0) { |
| throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB); |
| } |
| String principal = SecurityUtil.getServerPrincipal( |
| serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"), |
| InetAddress.getLocalHost().getCanonicalHostName()); |
| if (principal.length() == 0) { |
| throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); |
| } |
| Configuration conf = new Configuration(); |
| conf.set("hadoop.security.authentication", "kerberos"); |
| UserGroupInformation.setConfiguration(conf); |
| UserGroupInformation.loginUserFromKeytab(principal, keytabFile); |
| LOG.info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]", |
| keytabFile, principal); |
| } |
| catch (ServiceException ex) { |
| throw ex; |
| } |
| catch (Exception ex) { |
| throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); |
| } |
| } |
| |
| private static final String[] HADOOP_CONF_FILES = |
| {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml", "ssl-client.xml"}; |
| |
| |
| private Configuration loadHadoopConf(File dir) throws IOException { |
| Configuration hadoopConf = new XConfiguration(); |
| for (String file : HADOOP_CONF_FILES) { |
| File f = new File(dir, file); |
| if (f.exists()) { |
| InputStream is = new FileInputStream(f); |
| Configuration conf = new XConfiguration(is, false); |
| is.close(); |
| XConfiguration.copy(conf, hadoopConf); |
| } |
| } |
| return hadoopConf; |
| } |
| |
| private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException { |
| Map<String, File> map = new HashMap<String, File>(); |
| File configDir = new File(ConfigurationService.getConfigurationDirectory()); |
| for (String confDef : confDefs) { |
| if (confDef.trim().length() > 0) { |
| String[] parts = confDef.split("="); |
| if (parts.length == 2) { |
| String hostPort = parts[0]; |
| String confDir = parts[1]; |
| File dir = new File(confDir); |
| if (!dir.isAbsolute()) { |
| dir = new File(configDir, confDir); |
| } |
| if (dir.exists()) { |
| map.put(hostPort.toLowerCase(), dir); |
| } |
| else { |
| throw new ServiceException(ErrorCode.E0100, getClass().getName(), |
| "could not find " + type + " configuration directory: " + |
| dir.getAbsolutePath()); |
| } |
| } |
| else { |
| throw new ServiceException(ErrorCode.E0100, getClass().getName(), |
| "Incorrect " + type + " configuration definition: " + confDef); |
| } |
| } |
| } |
| return map; |
| } |
| |
| private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException { |
| try { |
| Map<String, File> map = parseConfigDirs(ConfigurationService.getStrings(serviceConf, HADOOP_CONFS), |
| "hadoop"); |
| for (Map.Entry<String, File> entry : map.entrySet()) { |
| hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue())); |
| } |
| } |
| catch (ServiceException ex) { |
| throw ex; |
| } |
| catch (Exception ex) { |
| throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); |
| } |
| } |
| |
| private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException { |
| try { |
| actionConfigDirs = parseConfigDirs(ConfigurationService.getStrings(serviceConf, ACTION_CONFS), "action"); |
| for (String hostport : actionConfigDirs.keySet()) { |
| actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>()); |
| } |
| } |
| catch (ServiceException ex) { |
| throw ex; |
| } |
| catch (Exception ex) { |
| throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); |
| } |
| } |
| |
| public void destroy() { |
| } |
| |
| public Class<? extends Service> getInterface() { |
| return HadoopAccessorService.class; |
| } |
| |
| UserGroupInformation getUGI(String user) throws IOException { |
| return ugiService.getProxyUser(user); |
| } |
| |
| /** |
| * Creates a Configuration using the site configuration for the specified hostname:port. |
| * <p> |
| * If the specified hostname:port is not defined it falls back to the '*' site |
| * configuration if available. If the '*' site configuration is not available, |
| * the JobConf has all Hadoop defaults. |
| * |
| * @param hostPort hostname:port to lookup Hadoop site configuration. |
| * @return a Configuration with the corresponding site configuration for hostPort. |
| */ |
| public Configuration createConfiguration(String hostPort) { |
| Configuration appConf = new Configuration(getCachedConf()); |
| XConfiguration.copy(getConfiguration(hostPort), appConf); |
| appConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true); |
| return appConf; |
| } |
| |
| public Configuration getCachedConf() { |
| if (cachedConf == null) { |
| loadCachedConf(); |
| } |
| return cachedConf; |
| } |
| |
| private void loadCachedConf() { |
| cachedConf = new Configuration(); |
| //for lazy loading |
| cachedConf.size(); |
| } |
| |
| private XConfiguration loadActionConf(String hostPort, String action) { |
| File dir = actionConfigDirs.get(hostPort); |
| XConfiguration actionConf = new XConfiguration(); |
| if (dir != null) { |
| // See if a dir with the action name exists. If so, load all the supported conf files in the dir |
| File actionConfDir = new File(dir, action); |
| |
| if (actionConfDir.exists() && actionConfDir.isDirectory()) { |
| LOG.info("Processing configuration files under [{0}]" |
| + " for action [{1}] and hostPort [{2}]", |
| actionConfDir.getAbsolutePath(), action, hostPort); |
| updateActionConfigWithDir(actionConf, actionConfDir); |
| } |
| } |
| |
| // Now check for <action.xml> This way <action.xml> has priority over <action-dir>/*.xml |
| File actionConfFile = new File(dir, action + ".xml"); |
| LOG.info("Processing configuration file [{0}] for action [{1}] and hostPort [{2}]", |
| actionConfFile.getAbsolutePath(), action, hostPort); |
| if (actionConfFile.exists()) { |
| updateActionConfigWithFile(actionConf, actionConfFile); |
| } |
| |
| return actionConf; |
| } |
| |
| private void updateActionConfigWithFile(Configuration actionConf, File actionConfFile) { |
| try { |
| Configuration conf = readActionConfFile(actionConfFile); |
| XConfiguration.copy(conf, actionConf); |
| } catch (IOException e) { |
| LOG.warn("Could not read file [{0}].", actionConfFile.getAbsolutePath()); |
| } |
| } |
| |
| private void updateActionConfigWithDir(Configuration actionConf, File actionConfDir) { |
| File[] actionConfFiles = actionConfDir.listFiles(new FilenameFilter() { |
| @Override |
| public boolean accept(File dir, String name) { |
| return ActionConfFileType.isSupportedFileType(name); |
| }}); |
| |
| if (actionConfFiles != null) { |
| Arrays.sort(actionConfFiles, new Comparator<File>() { |
| @Override |
| public int compare(File o1, File o2) { |
| return o1.getName().compareTo(o2.getName()); |
| } |
| }); |
| for (File f : actionConfFiles) { |
| if (f.isFile() && f.canRead()) { |
| updateActionConfigWithFile(actionConf, f); |
| } |
| } |
| } |
| } |
| |
| private Configuration readActionConfFile(File file) throws IOException { |
| InputStream fis = null; |
| try { |
| fis = new FileInputStream(file); |
| ActionConfFileType fileTyple = ActionConfFileType.getFileType(file.getName()); |
| switch (fileTyple) { |
| case XML: |
| return new XConfiguration(fis); |
| case PROPERTIES: |
| Properties properties = new Properties(); |
| properties.load(fis); |
| return new XConfiguration(properties); |
| default: |
| throw new UnsupportedOperationException( |
| String.format("Unable to parse action conf file of type %s", fileTyple)); |
| } |
| } finally { |
| IOUtils.closeSafely(fis); |
| } |
| } |
| |
| /** |
| * Returns a Configuration containing any defaults for an action for a particular cluster. |
| * <p> |
| * This configuration is used as default for the action configuration and enables cluster |
| * level default values per action. |
| * |
| * @param hostPort hostname"port to lookup the action default confiugration. |
| * @param action action name. |
| * @return the default configuration for the action for the specified cluster. |
| */ |
| public XConfiguration createActionDefaultConf(String hostPort, String action) { |
| hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; |
| Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort); |
| if (hostPortActionConfigs == null) { |
| hostPortActionConfigs = actionConfigs.get("*"); |
| hostPort = "*"; |
| } |
| XConfiguration actionConf = hostPortActionConfigs.get(action); |
| if (actionConf == null) { |
| // doing lazy loading as we don't know upfront all actions, no need to synchronize |
| // as it is a read operation an in case of a race condition loading and inserting |
| // into the Map is idempotent and the action-config Map is a ConcurrentHashMap |
| |
| // We first load a action of type default |
| // This allows for global configuration for all actions - for example |
| // all launchers in one queue and actions in another queue |
| // Are some configuration that applies to multiple actions - like |
| // config libraries path etc |
| actionConf = loadActionConf(hostPort, DEFAULT_ACTIONNAME); |
| |
| // Action specific default configuration will override the default action config |
| |
| XConfiguration.copy(loadActionConf(hostPort, action), actionConf); |
| hostPortActionConfigs.put(action, actionConf); |
| } |
| return new XConfiguration(actionConf.toProperties()); |
| } |
| |
| private Configuration getConfiguration(String hostPort) { |
| hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; |
| Configuration conf = hadoopConfigs.get(hostPort); |
| if (conf == null) { |
| conf = hadoopConfigs.get("*"); |
| if (conf == null) { |
| conf = new XConfiguration(); |
| } |
| } |
| return conf; |
| } |
| |
| /** |
| * Return a JobClient created with the provided user/group. |
| * |
| * |
| * @param user user |
| * @param conf JobConf with all necessary information to create the |
| * JobClient. |
| * @return JobClient created with the provided user/group. |
| * @throws HadoopAccessorException if the client could not be created. |
| */ |
| public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException { |
| ParamChecker.notEmpty(user, "user"); |
| if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { |
| throw new HadoopAccessorException(ErrorCode.E0903); |
| } |
| String jobTracker = conf.get(JavaActionExecutor.HADOOP_YARN_RM); |
| validateJobTracker(jobTracker); |
| try { |
| UserGroupInformation ugi = getUGI(user); |
| JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() { |
| public JobClient run() throws Exception { |
| return new JobClient(conf); |
| } |
| }); |
| return jobClient; |
| } |
| catch (IOException | InterruptedException ex) { |
| throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); |
| } |
| } |
| |
| /** |
| * Return a JobClient created with the provided user/group. |
| * |
| * |
| * @param user user |
| * @param conf Configuration with all necessary information to create the |
| * JobClient. |
| * @return JobClient created with the provided user/group. |
| * @throws HadoopAccessorException if the client could not be created. |
| */ |
| public JobClient createJobClient(String user, Configuration conf) throws HadoopAccessorException { |
| return createJobClient(user, new JobConf(conf)); |
| } |
| |
| /** |
| * Return a YarnClient created with the provided user and configuration. The caller is responsible for closing it when done. |
| * |
| * @param user The username to impersonate |
| * @param conf The conf |
| * @return a YarnClient with the provided user and configuration |
| * @throws HadoopAccessorException if the client could not be created. |
| */ |
| public YarnClient createYarnClient(String user, final Configuration conf) throws HadoopAccessorException { |
| ParamChecker.notEmpty(user, "user"); |
| if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { |
| throw new HadoopAccessorException(ErrorCode.E0903); |
| } |
| String rm = conf.get(JavaActionExecutor.HADOOP_YARN_RM); |
| validateJobTracker(rm); |
| try { |
| UserGroupInformation ugi = getUGI(user); |
| YarnClient yarnClient = ugi.doAs(new PrivilegedExceptionAction<YarnClient>() { |
| @Override |
| public YarnClient run() throws Exception { |
| YarnClient yarnClient = YarnClient.createYarnClient(); |
| yarnClient.init(conf); |
| yarnClient.start(); |
| return yarnClient; |
| } |
| }); |
| return yarnClient; |
| } catch (IOException | InterruptedException ex) { |
| throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); |
| } |
| } |
| |
| /** |
| * Return a FileSystem created with the provided user for the specified URI. |
| * |
| * @param user The username to impersonate |
| * @param uri file system URI. |
| * @param conf Configuration with all necessary information to create the FileSystem. |
| * @return FileSystem created with the provided user/group. |
| * @throws HadoopAccessorException if the filesystem could not be created. |
| */ |
| public FileSystem createFileSystem(String user, final URI uri, final Configuration conf) |
| throws HadoopAccessorException { |
| return createFileSystem(user, uri, conf, true); |
| } |
| |
| private FileSystem createFileSystem(String user, final URI uri, final Configuration conf, boolean checkAccessorProperty) |
| throws HadoopAccessorException { |
| ParamChecker.notEmpty(user, "user"); |
| |
| if (checkAccessorProperty && !conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { |
| throw new HadoopAccessorException(ErrorCode.E0903); |
| } |
| |
| checkSupportedFilesystem(uri); |
| |
| String nameNode = uri.getAuthority(); |
| if (nameNode == null) { |
| nameNode = conf.get("fs.default.name"); |
| if (nameNode != null) { |
| try { |
| nameNode = new URI(nameNode).getAuthority(); |
| } |
| catch (URISyntaxException ex) { |
| throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); |
| } |
| } |
| } |
| validateNameNode(nameNode); |
| final Configuration fileSystemConf = extendWithFileSystemSpecificPropertiesIfAny(uri, conf); |
| try { |
| UserGroupInformation ugi = getUGI(user); |
| return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { |
| public FileSystem run() throws Exception { |
| return FileSystem.get(uri, fileSystemConf); |
| } |
| }); |
| } |
| catch (IOException | InterruptedException ex) { |
| throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); |
| } |
| } |
| |
| Configuration extendWithFileSystemSpecificPropertiesIfAny(final URI uri, final Configuration conf) { |
| final String fsProps = String.format(FS_PROP_PATTERN, uri.getScheme()); |
| final String fsCustomProps = ConfigurationService.get(fsProps); |
| if (fsCustomProps == null || fsCustomProps.length() == 0) { |
| return conf; |
| } |
| |
| final Configuration result = new Configuration(); |
| XConfiguration.copy(conf, result); |
| for (final String entry : fsCustomProps.split(",")) { |
| final String[] nameAndValue = entry.trim().split("=", 2); |
| if (nameAndValue.length < 2) { |
| LOG.warn(String.format("Configuration for %s cannot be read: %s. Skipping...", |
| fsProps, Arrays.toString(nameAndValue))); |
| continue; |
| } |
| result.set(nameAndValue[0], nameAndValue[1]); |
| } |
| return result; |
| } |
| |
| /** |
| * Validate Job tracker |
| * @param jobTrackerUri job tracker uri |
| * @throws HadoopAccessorException if job tracker cannot be reached |
| */ |
| protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException { |
| validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900); |
| } |
| |
| /** |
| * Validate Namenode list |
| * @param nameNodeUri name node uri |
| * @throws HadoopAccessorException if NN cannot be reached |
| */ |
| protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException { |
| validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901); |
| } |
| |
| private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException { |
| if (uri != null) { |
| uri = uri.toLowerCase().trim(); |
| if (whitelist.size() > 0 && !whitelist.contains(uri)) { |
| throw new HadoopAccessorException(error, uri, whitelist); |
| } |
| } |
| } |
| |
| public void addFileToClassPath(String user, final Path file, final Configuration conf) |
| throws IOException { |
| ParamChecker.notEmpty(user, "user"); |
| try { |
| UserGroupInformation ugi = getUGI(user); |
| ugi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| JobUtils.addFileToClassPath(file, conf, null); |
| return null; |
| } |
| }); |
| |
| } |
| catch (InterruptedException ex) { |
| throw new IOException(ex); |
| } |
| |
| } |
| |
| /** |
| * checks configuration parameter if filesystem scheme is among the list of supported ones |
| * this makes system robust to filesystems other than HDFS also |
| * |
| * @param uri uri |
| * @throws HadoopAccessorException if scheme is not supported |
| */ |
| |
| public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException { |
| if (allSchemesSupported) |
| return; |
| String uriScheme = uri.getScheme(); |
| if (uriScheme != null) { // skip the check if no scheme is given |
| if(!supportedSchemes.isEmpty()) { |
| LOG.debug("Checking if filesystem " + uriScheme + " is supported"); |
| if (!supportedSchemes.contains(uriScheme)) { |
| throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString()); |
| } |
| } |
| } |
| } |
| |
| public Set<String> getSupportedSchemes() { |
| return supportedSchemes; |
| } |
| |
| /** |
| * Creates a {@link LocalResource} for the Configuration to localize it for a Yarn Container. This involves also writing it |
| * to HDFS. |
| * Example usage: |
| * * <pre> |
| * {@code |
| * LocalResource res1 = createLocalResourceForConfigurationFile(filename1, user, conf, uri, dir); |
| * LocalResource res2 = createLocalResourceForConfigurationFile(filename2, user, conf, uri, dir); |
| * ... |
| * Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); |
| * localResources.put(filename1, res1); |
| * localResources.put(filename2, res2); |
| * ... |
| * containerLaunchContext.setLocalResources(localResources); |
| * } |
| * </pre> |
| * |
| * @param filename The filename to use on the remote filesystem and once it has been localized. |
| * @param user The user |
| * @param conf The configuration to process |
| * @param uri The URI of the remote filesystem (e.g. HDFS) |
| * @param dir The directory on the remote filesystem to write the file to |
| * @return localResource |
| * @throws IOException A problem occurred writing the file |
| * @throws HadoopAccessorException A problem occured with Hadoop |
| * @throws URISyntaxException A problem occurred parsing the URI |
| */ |
| public LocalResource createLocalResourceForConfigurationFile(String filename, String user, Configuration conf, URI uri, |
| Path dir) |
| throws IOException, HadoopAccessorException, URISyntaxException { |
| Path dst = new Path(dir, filename); |
| FileSystem fs = createFileSystem(user, uri, conf, false); |
| try (OutputStream os = fs.create(dst)){ |
| conf.writeXml(os); |
| } |
| LocalResource localResource = Records.newRecord(LocalResource.class); |
| localResource.setType(LocalResourceType.FILE); localResource.setVisibility(LocalResourceVisibility.APPLICATION); |
| localResource.setResource(ConverterUtils.getYarnUrlFromPath(dst)); |
| FileStatus destStatus = fs.getFileStatus(dst); |
| localResource.setTimestamp(destStatus.getModificationTime()); |
| localResource.setSize(destStatus.getLen()); |
| return localResource; |
| } |
| |
| } |