| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.client; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.InetSocketAddress; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.nio.ByteBuffer; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.Vector; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| |
| import com.google.common.base.Strings; |
| import org.apache.commons.codec.digest.DigestUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.tez.common.JavaOptsChecker; |
| import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; |
| import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsAction; |
| import org.apache.hadoop.io.DataOutputBuffer; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.ipc.ProtobufRpcEngine; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; |
| import org.apache.hadoop.yarn.api.records.ApplicationAccessType; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.LocalResourceType; |
| import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.URL; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; |
| import org.apache.hadoop.yarn.util.ConverterUtils; |
| import org.apache.hadoop.yarn.util.Records; |
| import org.apache.log4j.Level; |
| import org.apache.tez.common.TezCommonUtils; |
| import org.apache.tez.common.TezYARNUtils; |
| import org.apache.tez.common.VersionInfo; |
| import org.apache.tez.common.security.ACLManager; |
| import org.apache.tez.common.security.JobTokenIdentifier; |
| import org.apache.tez.common.security.JobTokenSecretManager; |
| import org.apache.tez.common.security.TokenCache; |
| import org.apache.tez.dag.api.DAG; |
| import org.apache.tez.dag.api.DagTypeConverters; |
| import org.apache.tez.dag.api.DataSinkDescriptor; |
| import org.apache.tez.dag.api.DataSourceDescriptor; |
| import org.apache.tez.dag.api.SessionNotRunning; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.TezConstants; |
| import org.apache.tez.dag.api.TezException; |
| import org.apache.tez.dag.api.TezUncheckedException; |
| import org.apache.tez.dag.api.Vertex; |
| import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; |
| import org.apache.tez.dag.api.records.DAGProtos; |
| import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; |
| import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; |
| import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Function; |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.Lists; |
| |
| @Private |
| public class TezClientUtils { |
| |
| private static Logger LOG = LoggerFactory.getLogger(TezClientUtils.class); |
| private static final int UTF8_CHUNK_SIZE = 16 * 1024; |
| |
| private static FileStatus[] getLRFileStatus(String fileName, Configuration conf) throws |
| IOException { |
| URI uri; |
| try { |
| uri = new URI(fileName); |
| } catch (URISyntaxException e) { |
| String message = "Invalid URI defined in configuration for" |
| + " location of TEZ jars. providedURI=" + fileName; |
| LOG.error(message); |
| throw new TezUncheckedException(message, e); |
| } |
| |
| Path p = new Path(uri); |
| FileSystem fs = p.getFileSystem(conf); |
| p = fs.resolvePath(p.makeQualified(fs.getUri(), |
| fs.getWorkingDirectory())); |
| FileSystem targetFS = p.getFileSystem(conf); |
| if (targetFS.isDirectory(p)) { |
| return targetFS.listStatus(p); |
| } else { |
| FileStatus fStatus = targetFS.getFileStatus(p); |
| return new FileStatus[]{fStatus}; |
| } |
| } |
| |
| /** |
| * Setup LocalResource map for Tez jars based on provided Configuration |
| * |
| * @param conf |
| * Configuration to use to access Tez jars' locations |
| * @param credentials |
| * a credentials instance into which tokens for the Tez local |
| * resources will be populated |
| * @param tezJarResources Map of LocalResources to use for AM and DAGs |
| * @return Whether the archive-based deployment of Tez was used. |
| * @throws IOException |
| */ |
| static boolean setupTezJarsLocalResources(TezConfiguration conf, |
| Credentials credentials, Map<String, LocalResource> tezJarResources) |
| throws IOException { |
| Objects.requireNonNull(credentials, "A non-null credentials object should be specified"); |
| boolean usingTezArchive = false; |
| |
| if (conf.getBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, false)){ |
| LOG.info("Ignoring '" + TezConfiguration.TEZ_LIB_URIS + "' since '" + |
| TezConfiguration.TEZ_IGNORE_LIB_URIS + "' is set to true"); |
| } else { |
| // Add tez jars to local resource |
| String[] tezJarUris = conf.getStrings(TezConfiguration.TEZ_LIB_URIS); |
| |
| if (tezJarUris == null || tezJarUris.length == 0) { |
| throw new TezUncheckedException("Invalid configuration of tez jars" |
| + ", " + TezConfiguration.TEZ_LIB_URIS |
| + " is not defined in the configuration"); |
| } |
| |
| LOG.info("Using tez.lib.uris value from configuration: " |
| + conf.get(TezConfiguration.TEZ_LIB_URIS)); |
| LOG.info("Using tez.lib.uris.classpath value from configuration: " |
| + conf.get(TezConfiguration.TEZ_LIB_URIS_CLASSPATH)); |
| |
| usingTezArchive = addLocalResources(conf, tezJarUris, |
| tezJarResources, credentials); |
| |
| if (tezJarResources.isEmpty()) { |
| throw new TezUncheckedException( |
| "No files found in locations specified in " |
| + TezConfiguration.TEZ_LIB_URIS + " . Locations: " |
| + StringUtils.join(tezJarUris, ',')); |
| } |
| } |
| |
| // Add aux uris to local resources |
| addLocalResources(conf, conf.getStrings(TezConfiguration.TEZ_AUX_URIS), |
| tezJarResources, credentials); |
| |
| return usingTezArchive; |
| } |
| |
| private static boolean addLocalResources(Configuration conf, |
| String[] configUris, Map<String, LocalResource> tezJarResources, |
| Credentials credentials) throws IOException { |
| boolean usingTezArchive = false; |
| if (configUris == null || configUris.length == 0) { |
| return usingTezArchive; |
| } |
| List<Path> configuredPaths = Lists.newArrayListWithCapacity(configUris.length); |
| for (String configUri : configUris) { |
| URI u = null; |
| try { |
| u = new URI(configUri); |
| } catch (URISyntaxException e) { |
| throw new IOException("Unable to convert " + configUri + "to URI", e); |
| } |
| Path p = new Path(u); |
| FileSystem remoteFS = p.getFileSystem(conf); |
| p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), |
| remoteFS.getWorkingDirectory())); |
| |
| LocalResourceType type = null; |
| |
| //Check if path is an archive |
| if(p.getName().endsWith(".tar.gz") || |
| p.getName().endsWith(".tgz") || |
| p.getName().endsWith(".zip") || |
| p.getName().endsWith(".tar")) { |
| type = LocalResourceType.ARCHIVE; |
| } else { |
| type = LocalResourceType.FILE; |
| } |
| |
| FileStatus [] fileStatuses = getLRFileStatus(configUri, conf); |
| |
| for (FileStatus fStatus : fileStatuses) { |
| String linkName; |
| if (fStatus.isDirectory()) { |
| // Skip directories - no recursive search support. |
| continue; |
| } |
| // If the resource is an archive, we've already done this work |
| if(type != LocalResourceType.ARCHIVE) { |
| u = fStatus.getPath().toUri(); |
| p = new Path(u); |
| remoteFS = p.getFileSystem(conf); |
| p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), |
| remoteFS.getWorkingDirectory())); |
| if(null != u.getFragment()) { |
| LOG.warn("Fragment set for link being interpreted as a file," + |
| "URI: " + u.toString()); |
| } |
| } |
| |
| // Add URI fragment or just the filename |
| Path name = new Path((null == u.getFragment()) |
| ? p.getName() |
| : u.getFragment()); |
| if (name.isAbsolute()) { |
| throw new IllegalArgumentException("Resource name must be " |
| + "relative, not absolute: " + name |
| + " in URI: " + u.toString()); |
| } |
| |
| URL url = ConverterUtils.getYarnUrlFromURI(p.toUri()); |
| linkName = name.toUri().getPath(); |
| // For legacy reasons, set archive to tezlib if there is |
| // only a single archive and no fragment |
| if(type == LocalResourceType.ARCHIVE && |
| configUris.length == 1 && null == u.getFragment()) { |
| linkName = TezConstants.TEZ_TAR_LR_NAME; |
| usingTezArchive = true; |
| } |
| |
| LocalResourceVisibility lrVisibility; |
| if (checkAncestorPermissionsForAllUsers(conf, p, |
| FsAction.EXECUTE) && |
| fStatus.getPermission().getOtherAction().implies(FsAction.READ)) { |
| lrVisibility = LocalResourceVisibility.PUBLIC; |
| } else { |
| lrVisibility = LocalResourceVisibility.PRIVATE; |
| } |
| |
| if (tezJarResources.containsKey(linkName)) { |
| String message = "Duplicate resource found" |
| + ", resourceName=" + linkName |
| + ", existingPath=" + |
| tezJarResources.get(linkName).getResource().toString() |
| + ", newPath=" + fStatus.getPath(); |
| LOG.warn(message); |
| } |
| |
| tezJarResources.put(linkName, |
| LocalResource.newInstance( |
| url, |
| type, |
| lrVisibility, |
| fStatus.getLen(), |
| fStatus.getModificationTime())); |
| configuredPaths.add(fStatus.getPath()); |
| } |
| } |
| // Obtain credentials. |
| if (!configuredPaths.isEmpty()) { |
| TokenCache.obtainTokensForFileSystems(credentials, |
| configuredPaths.toArray(new Path[configuredPaths.size()]), conf); |
| } |
| return usingTezArchive; |
| } |
| |
| static void processTezLocalCredentialsFile(Credentials credentials, Configuration conf) |
| throws IOException { |
| String path = conf.get(TezConfiguration.TEZ_CREDENTIALS_PATH); |
| if (path == null) { |
| return; |
| } else { |
| TokenCache.mergeBinaryTokens(credentials, conf, path); |
| } |
| } |
| |
| /** |
| * Verify or create the Staging area directory on the configured Filesystem |
| * @param stagingArea Staging area directory path |
| * @return the FileSytem for the staging area directory |
| * @throws IOException |
| */ |
| public static FileSystem ensureStagingDirExists(Configuration conf, |
| Path stagingArea) |
| throws IOException { |
| FileSystem fs = stagingArea.getFileSystem(conf); |
| String realUser; |
| String currentUser; |
| UserGroupInformation ugi = UserGroupInformation.getLoginUser(); |
| realUser = ugi.getShortUserName(); |
| currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); |
| if (fs.exists(stagingArea)) { |
| FileStatus fsStatus = fs.getFileStatus(stagingArea); |
| String owner = fsStatus.getOwner(); |
| if (!(owner.equals(currentUser) || owner.equals(realUser))) { |
| throw new IOException("The ownership on the staging directory " |
| + stagingArea + " is not as expected. " + "It is owned by " + owner |
| + ". The directory must " + "be owned by the submitter " |
| + currentUser + " or " + "by " + realUser); |
| } |
| if (!fsStatus.getPermission().equals(TezCommonUtils.TEZ_AM_DIR_PERMISSION)) { |
| LOG.info("Permissions on staging directory " + stagingArea + " are " |
| + "incorrect: " + fsStatus.getPermission() |
| + ". Fixing permissions " + "to correct value " |
| + TezCommonUtils.TEZ_AM_DIR_PERMISSION); |
| fs.setPermission(stagingArea, TezCommonUtils.TEZ_AM_DIR_PERMISSION); |
| } |
| } else { |
| TezCommonUtils.mkDirForAM(fs, stagingArea); |
| } |
| return fs; |
| } |
| |
| /** |
| * Populate {@link Credentials} for the URI's to access them from their {@link FileSystem}s |
| * @param uris URIs that need to be accessed |
| * @param credentials Credentials object into which to add the credentials |
| * @param conf Configuration to access the FileSystem |
| * @throws IOException |
| */ |
| public static void addFileSystemCredentialsFromURIs(Collection<URI> uris, Credentials credentials, |
| Configuration conf) throws IOException { |
| // Obtain Credentials for any paths that the user may have configured. |
| if (uris != null && !uris.isEmpty()) { |
| Iterator<Path> pathIter = Iterators.transform(uris.iterator(), new Function<URI, Path>() { |
| @Override |
| public Path apply(URI input) { |
| return new Path(input); |
| } |
| }); |
| |
| Path[] paths = Iterators.toArray(pathIter, Path.class); |
| TokenCache.obtainTokensForFileSystems(credentials, paths, conf); |
| } |
| } |
| |
| /** |
| * Obtains tokens for the DAG based on the list of URIs setup in the DAG. The |
| * fetched credentials are populated back into the DAG and can be retrieved |
| * via dag.getCredentials |
| * |
| * @param dag |
| * the dag for which credentials need to be setup |
| * @param sessionCredentials |
| * session credentials which have already been obtained, and will be |
| * required for the DAG |
| * @param conf |
| * @throws IOException |
| */ |
| @Private |
| static Credentials setupDAGCredentials(DAG dag, Credentials sessionCredentials, |
| Configuration conf) throws IOException { |
| |
| Objects.requireNonNull(sessionCredentials); |
| TezCommonUtils.logCredentials(LOG, sessionCredentials, "session"); |
| Credentials dagCredentials = new Credentials(); |
| // All session creds are required for the DAG. |
| dagCredentials.mergeAll(sessionCredentials); |
| |
| // Add additional credentials based on any URIs that the user may have specified. |
| |
| // Obtain Credentials for any paths that the user may have configured. |
| addFileSystemCredentialsFromURIs(dag.getURIsForCredentials(), dagCredentials, conf); |
| |
| // Obtain Credentials for the local resources configured on the DAG |
| try { |
| Set<Path> lrPaths = new HashSet<Path>(); |
| for (Vertex v: dag.getVertices()) { |
| for (LocalResource lr: v.getTaskLocalFiles().values()) { |
| lrPaths.add(ConverterUtils.getPathFromYarnURL(lr.getResource())); |
| } |
| List<DataSourceDescriptor> dataSources = v.getDataSources(); |
| for (DataSourceDescriptor dataSource : dataSources) { |
| addFileSystemCredentialsFromURIs(dataSource.getURIsForCredentials(), dagCredentials, conf); |
| } |
| List<DataSinkDescriptor> dataSinks = v.getDataSinks(); |
| for (DataSinkDescriptor dataSink : dataSinks) { |
| addFileSystemCredentialsFromURIs(dataSink.getURIsForCredentials(), dagCredentials, conf); |
| } |
| } |
| |
| for (LocalResource lr: dag.getTaskLocalFiles().values()) { |
| lrPaths.add(ConverterUtils.getPathFromYarnURL(lr.getResource())); |
| } |
| |
| Path[] paths = lrPaths.toArray(new Path[lrPaths.size()]); |
| TokenCache.obtainTokensForFileSystems(dagCredentials, paths, conf); |
| |
| } catch (URISyntaxException e) { |
| throw new IOException(e); |
| } |
| |
| return dagCredentials; |
| } |
| |
| /** |
| * Create an ApplicationSubmissionContext to launch a Tez AM |
| * @param appId Application Id |
| * @param dag DAG to be submitted |
| * @param amName Name for the application |
| * @param amConfig AM Configuration |
| * @param tezJarResources Resources to be used by the AM |
| * @param sessionCreds the credential object which will be populated with session specific |
| * @param servicePluginsDescriptor descriptor for services which may be running in the AM |
| * @return an ApplicationSubmissionContext to launch a Tez AM |
| * @throws IOException |
| * @throws YarnException |
| */ |
| @Private |
| @VisibleForTesting |
| public static ApplicationSubmissionContext createApplicationSubmissionContext( |
| ApplicationId appId, DAG dag, String amName, |
| AMConfiguration amConfig, Map<String, LocalResource> tezJarResources, |
| Credentials sessionCreds, boolean tezLrsAsArchive, |
| TezApiVersionInfo apiVersionInfo, |
| ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker) |
| throws IOException, YarnException { |
| |
| Objects.requireNonNull(sessionCreds); |
| TezConfiguration conf = amConfig.getTezConfiguration(); |
| |
| FileSystem fs = TezClientUtils.ensureStagingDirExists(conf, |
| TezCommonUtils.getTezBaseStagingPath(conf)); |
| String strAppId = appId.toString(); |
| Path tezSysStagingPath = TezCommonUtils.createTezSystemStagingPath(conf, strAppId); |
| Path binaryConfPath = TezCommonUtils.getTezConfStagingPath(tezSysStagingPath); |
| binaryConfPath = fs.makeQualified(binaryConfPath); |
| |
| // Setup resource requirements |
| Resource capability = Records.newRecord(Resource.class); |
| capability.setMemory( |
| amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, |
| TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT)); |
| capability.setVirtualCores( |
| amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, |
| TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT)); |
| LOG.debug("AppMaster capability = {}", capability); |
| |
| // Setup required Credentials for the AM launch. DAG specific credentials |
| // are handled separately. |
| ByteBuffer securityTokens = null; |
| Credentials amLaunchCredentials = |
| prepareAmLaunchCredentials(amConfig, sessionCreds, conf, binaryConfPath); |
| |
| DataOutputBuffer dob = new DataOutputBuffer(); |
| amLaunchCredentials.writeTokenStorageToStream(dob); |
| securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); |
| |
| // Setup the command to run the AM |
| List<String> vargs = new ArrayList<String>(8); |
| vargs.add(Environment.JAVA_HOME.$() + "/bin/java"); |
| |
| String amOpts = constructAMLaunchOpts(amConfig.getTezConfiguration(), capability); |
| vargs.add(amOpts); |
| |
| String amLogLevelString = amConfig.getTezConfiguration().get( |
| TezConfiguration.TEZ_AM_LOG_LEVEL, |
| TezConfiguration.TEZ_AM_LOG_LEVEL_DEFAULT); |
| String[] amLogParams = parseLogParams(amLogLevelString); |
| |
| String amLogLevel = amLogParams[0]; |
| maybeAddDefaultLoggingJavaOpts(amLogLevel, vargs); |
| |
| |
| // FIX sun bug mentioned in TEZ-327 |
| vargs.add("-Dsun.nio.ch.bugLevel=''"); |
| |
| vargs.add(TezConstants.TEZ_APPLICATION_MASTER_CLASS); |
| if (dag == null) { |
| vargs.add("--" + TezConstants.TEZ_SESSION_MODE_CLI_OPTION); |
| } |
| |
| vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + |
| File.separator + ApplicationConstants.STDOUT); |
| vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + |
| File.separator + ApplicationConstants.STDERR); |
| |
| |
| Vector<String> vargsFinal = new Vector<String>(8); |
| // Final command |
| StringBuilder mergedCommand = new StringBuilder(); |
| for (CharSequence str : vargs) { |
| mergedCommand.append(str).append(" "); |
| } |
| vargsFinal.add(mergedCommand.toString()); |
| |
| LOG.debug("Command to launch container for ApplicationMaster is : {}", mergedCommand); |
| |
| Map<String, String> environment = new TreeMap<String, String>(); |
| TezYARNUtils.setupDefaultEnv(environment, conf, |
| TezConfiguration.TEZ_AM_LAUNCH_ENV, |
| TezConfiguration.TEZ_AM_LAUNCH_ENV_DEFAULT, |
| TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_ENV, |
| TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_ENV_DEFAULT, |
| tezLrsAsArchive); |
| |
| addVersionInfoToEnv(environment, apiVersionInfo); |
| addLogParamsToEnv(environment, amLogParams); |
| |
| Map<String, LocalResource> amLocalResources = |
| new TreeMap<String, LocalResource>(); |
| |
| // Not fetching credentials for AMLocalResources. Expect this to be provided via AMCredentials. |
| if (amConfig.getAMLocalResources() != null) { |
| amLocalResources.putAll(amConfig.getAMLocalResources()); |
| } |
| amLocalResources.putAll(tezJarResources); |
| |
| TezConfiguration tezConf = amConfig.getTezConfiguration(); |
| // Merge the dag access controls into tez am config. |
| if (dag != null && dag.getDagAccessControls() != null) { |
| // Merge updates the conf object passed. In non session mode, same client object can be used |
| // to submit multiple dags, copying this prevents ACL of one DAG from being used in another. |
| tezConf = new TezConfiguration(amConfig.getTezConfiguration()); |
| dag.getDagAccessControls().mergeIntoAmAcls(tezConf); |
| } |
| |
| // emit conf as PB file |
| // don't overwrite existing conf, needed for TezClient.getClient() so existing containers have stable resource fingerprints |
| if(!binaryConfPath.getFileSystem(tezConf).exists(binaryConfPath)) { |
| ConfigurationProto finalConfProto = createFinalConfProtoForApp(tezConf, |
| servicePluginsDescriptor); |
| |
| FSDataOutputStream amConfPBOutBinaryStream = null; |
| try { |
| amConfPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, binaryConfPath); |
| finalConfProto.writeTo(amConfPBOutBinaryStream); |
| } finally { |
| if (amConfPBOutBinaryStream != null) { |
| amConfPBOutBinaryStream.close(); |
| } |
| } |
| } |
| |
| LocalResource binaryConfLRsrc = |
| TezClientUtils.createLocalResource(fs, |
| binaryConfPath, LocalResourceType.FILE, |
| LocalResourceVisibility.APPLICATION); |
| amConfig.setBinaryConfLR(binaryConfLRsrc); |
| amLocalResources.put(TezConstants.TEZ_PB_BINARY_CONF_NAME, |
| binaryConfLRsrc); |
| |
| // Create Session Jars definition to be sent to AM as a local resource |
| Path sessionJarsPath = TezCommonUtils.getTezAMJarStagingPath(tezSysStagingPath); |
| FSDataOutputStream sessionJarsPBOutStream = null; |
| try { |
| sessionJarsPBOutStream = TezCommonUtils.createFileForAM(fs, sessionJarsPath); |
| // Write out the initial list of resources which will be available in the AM |
| DAGProtos.PlanLocalResourcesProto amResourceProto; |
| if (amLocalResources != null && !amLocalResources.isEmpty()) { |
| amResourceProto = DagTypeConverters.convertFromLocalResources(amLocalResources); |
| } else { |
| amResourceProto = DAGProtos.PlanLocalResourcesProto.getDefaultInstance(); |
| } |
| amResourceProto.writeDelimitedTo(sessionJarsPBOutStream); |
| } finally { |
| if (sessionJarsPBOutStream != null) { |
| sessionJarsPBOutStream.close(); |
| } |
| } |
| |
| LocalResource sessionJarsPBLRsrc = |
| TezClientUtils.createLocalResource(fs, |
| sessionJarsPath, LocalResourceType.FILE, |
| LocalResourceVisibility.APPLICATION); |
| amLocalResources.put( |
| TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME, |
| sessionJarsPBLRsrc); |
| |
| String user = UserGroupInformation.getCurrentUser().getShortUserName(); |
| ACLManager aclManager = new ACLManager(user, amConfig.getTezConfiguration()); |
| Map<ApplicationAccessType, String> acls = aclManager.toYARNACls(); |
| |
| if(dag != null) { |
| |
| DAGPlan dagPB = prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive, |
| sessionCreds, servicePluginsDescriptor, javaOptsChecker); |
| |
| // emit protobuf DAG file style |
| Path binaryPath = TezCommonUtils.getTezBinPlanStagingPath(tezSysStagingPath); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Stage directory information for AppId :" + appId + " tezSysStagingPath :" |
| + tezSysStagingPath + " binaryConfPath :" + binaryConfPath + " sessionJarsPath :" |
| + sessionJarsPath + " binaryPlanPath :" + binaryPath); |
| } |
| |
| FSDataOutputStream dagPBOutBinaryStream = null; |
| |
| try { |
| //binary output |
| dagPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, binaryPath); |
| dagPB.writeTo(dagPBOutBinaryStream); |
| } finally { |
| if(dagPBOutBinaryStream != null){ |
| dagPBOutBinaryStream.close(); |
| } |
| } |
| |
| amLocalResources.put(TezConstants.TEZ_PB_PLAN_BINARY_NAME, |
| TezClientUtils.createLocalResource(fs, |
| binaryPath, LocalResourceType.FILE, |
| LocalResourceVisibility.APPLICATION)); |
| |
| if (Level.DEBUG.isGreaterOrEqual(Level.toLevel(amLogLevel))) { |
| Path textPath = localizeDagPlanAsText(dagPB, fs, amConfig, strAppId, tezSysStagingPath); |
| amLocalResources.put(TezConstants.TEZ_PB_PLAN_TEXT_NAME, |
| TezClientUtils.createLocalResource(fs, |
| textPath, LocalResourceType.FILE, |
| LocalResourceVisibility.APPLICATION)); |
| } |
| } |
| |
| // Send the shuffle token as part of the AM launch context, so that the NM running the AM can |
| // provide this to AuxServices running on the AM node - in case tasks run within the AM, |
| // and no other task runs on this node. |
| Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>(); |
| String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, |
| TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); |
| serviceData.put(auxiliaryService, |
| TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(amLaunchCredentials))); |
| |
| // Setup ContainerLaunchContext for AM container |
| ContainerLaunchContext amContainer = |
| ContainerLaunchContext.newInstance(amLocalResources, environment, |
| vargsFinal, serviceData, securityTokens, acls); |
| |
| // Set up the ApplicationSubmissionContext |
| ApplicationSubmissionContext appContext = Records |
| .newRecord(ApplicationSubmissionContext.class); |
| |
| Collection<String> tagsFromConf = |
| amConfig.getTezConfiguration().getTrimmedStringCollection( |
| TezConfiguration.TEZ_APPLICATION_TAGS); |
| |
| appContext.setApplicationType(TezConstants.TEZ_APPLICATION_TYPE); |
| if (tagsFromConf != null && !tagsFromConf.isEmpty()) { |
| appContext.setApplicationTags(new HashSet<String>(tagsFromConf)); |
| } |
| appContext.setApplicationId(appId); |
| appContext.setResource(capability); |
| String queueName = amConfig.getQueueName(); |
| if (queueName != null && !queueName.isEmpty()) { |
| appContext.setQueue(amConfig.getQueueName()); |
| } |
| // set the application priority |
| setApplicationPriority(appContext, amConfig); |
| appContext.setApplicationName(amName); |
| appContext.setCancelTokensWhenComplete(amConfig.getTezConfiguration().getBoolean( |
| TezConfiguration.TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION, |
| TezConfiguration.TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION_DEFAULT)); |
| appContext.setAMContainerSpec(amContainer); |
| |
| appContext.setMaxAppAttempts( |
| amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, |
| TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT)); |
| |
| return appContext; |
| |
| } |
| |
| static Credentials prepareAmLaunchCredentials(AMConfiguration amConfig, Credentials sessionCreds, |
| TezConfiguration conf, Path binaryConfPath) throws IOException { |
| // Setup security tokens |
| Credentials amLaunchCredentials = new Credentials(); |
| |
| // Add SimpleHistoryLoggingService logDir creds to the list of session credentials |
| // If it is on HDFS |
| String simpleHistoryLogDir = conf.get(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR); |
| if (simpleHistoryLogDir != null && !simpleHistoryLogDir.isEmpty()) { |
| Path simpleHistoryLogDirPath = new Path(simpleHistoryLogDir); |
| TokenCache.obtainTokensForFileSystems(sessionCreds, new Path[] { simpleHistoryLogDirPath }, |
| conf); |
| } |
| |
| // Add Staging dir creds to the list of session credentials. |
| TokenCache.obtainTokensForFileSystems(sessionCreds, new Path[] {binaryConfPath }, conf); |
| |
| populateTokenCache(conf, sessionCreds); |
| |
| // Add session specific credentials to the AM credentials. |
| amLaunchCredentials.mergeAll(sessionCreds); |
| |
| if (amConfig.getCredentials() != null) { |
| amLaunchCredentials.mergeAll(amConfig.getCredentials()); |
| } |
| TezCommonUtils.logCredentials(LOG, amLaunchCredentials, "amLaunch"); |
| return amLaunchCredentials; |
| } |
| |
| //get secret keys and tokens and store them into TokenCache |
| private static void populateTokenCache(TezConfiguration conf, Credentials credentials) |
| throws IOException{ |
| // add the delegation tokens from configuration |
| String[] nameNodes = conf.getStrings(TezConfiguration.TEZ_JOB_FS_SERVERS); |
| LOG.debug("adding the following namenodes' delegation tokens:" + |
| Arrays.toString(nameNodes)); |
| if(nameNodes != null) { |
| Path[] ps = new Path[nameNodes.length]; |
| for(int i = 0; i < nameNodes.length; i++) { |
| ps[i] = new Path(nameNodes[i]); |
| } |
| TokenCache.obtainTokensForFileSystems(credentials, ps, conf); |
| } |
| } |
| |
| static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig, |
| Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive, |
| Credentials credentials, ServicePluginsDescriptor servicePluginsDescriptor, |
| JavaOptsChecker javaOptsChecker) throws IOException { |
| Credentials dagCredentials = setupDAGCredentials(dag, credentials, |
| amConfig.getTezConfiguration()); |
| TezCommonUtils.logCredentials(LOG, dagCredentials, "dagPlan"); |
| return dag.createDag(amConfig.getTezConfiguration(), dagCredentials, tezJarResources, |
| amConfig.getBinaryConfLR(), tezLrsAsArchive, servicePluginsDescriptor, javaOptsChecker); |
| } |
| |
| static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) { |
| Objects.requireNonNull(vargs); |
| TezClientUtils.addLog4jSystemProperties(logLevel, vargs); |
| } |
| |
| @Private |
| public static String maybeAddDefaultLoggingJavaOpts(String logLevel, String javaOpts) { |
| List<String> vargs = new ArrayList<String>(5); |
| if (javaOpts != null) { |
| Collections.addAll(vargs, javaOpts.split(" ")); |
| } else { |
| vargs.add(""); |
| } |
| maybeAddDefaultLoggingJavaOpts(logLevel, vargs); |
| if (vargs.size() == 1) { |
| return vargs.get(0); |
| } |
| return StringUtils.join(vargs, " ").trim(); |
| } |
| |
| @Private |
| public static String addDefaultsToTaskLaunchCmdOpts(String vOpts, Configuration conf) |
| throws TezException { |
| return addDefaultsToTaskLaunchCmdOpts(vOpts, conf, null); |
| } |
| |
| @Private |
| public static String addDefaultsToTaskLaunchCmdOpts(String vOpts, Configuration conf, |
| JavaOptsChecker javaOptsChecker) throws TezException { |
| String vConfigOpts = ""; |
| String taskDefaultOpts = conf.get(TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS, |
| TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT); |
| if (taskDefaultOpts != null && !taskDefaultOpts.isEmpty()) { |
| vConfigOpts = taskDefaultOpts + " "; |
| } |
| String defaultTaskCmdOpts = TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS_DEFAULT; |
| if (vOpts != null && !vOpts.isEmpty()) { |
| // Only use defaults if nothing is specified by the user |
| defaultTaskCmdOpts = ""; |
| } |
| |
| vConfigOpts = vConfigOpts + conf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, |
| defaultTaskCmdOpts); |
| if (vConfigOpts != null && !vConfigOpts.isEmpty()) { |
| // Add options specified in the DAG at the end. |
| vOpts = vConfigOpts + " " + vOpts; |
| } |
| |
| if (javaOptsChecker != null) { |
| javaOptsChecker.checkOpts(vOpts); |
| } |
| |
| return vOpts; |
| } |
| |
| @Private |
| @VisibleForTesting |
| public static void addLog4jSystemProperties(String logLevel, |
| List<String> vargs) { |
| vargs.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator"); |
| vargs.add("-Dlog4j.configuration=" |
| + TezConstants.TEZ_CONTAINER_LOG4J_PROPERTIES_FILE); |
| vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" |
| + ApplicationConstants.LOG_DIR_EXPANSION_VAR); |
| boolean isRootLoggerPresent = false; |
| String rootLoggerArg = "-D" + TezConstants.TEZ_ROOT_LOGGER_NAME + "=" + logLevel |
| + "," + TezConstants.TEZ_CONTAINER_LOGGER_NAME; |
| for (int i = 0; i < vargs.size(); i++) { |
| String arg = vargs.get(i); |
| if (arg.contains(TezConstants.TEZ_ROOT_LOGGER_NAME)) { |
| vargs.set(i, rootLoggerArg); |
| isRootLoggerPresent = true; |
| } |
| } |
| if (!isRootLoggerPresent) { |
| vargs.add(rootLoggerArg); |
| } |
| } |
| |
| static ConfigurationProto createFinalConfProtoForApp(Configuration amConf, |
| ServicePluginsDescriptor servicePluginsDescriptor) { |
| assert amConf != null; |
| ConfigurationProto.Builder builder = ConfigurationProto.newBuilder(); |
| for (Entry<String, String> entry : amConf) { |
| String key = entry.getKey(); |
| String val = amConf.get(key); |
| if(val != null) { |
| PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder(); |
| kvp.setKey(key); |
| kvp.setValue(val); |
| builder.addConfKeyValues(kvp); |
| } else { |
| LOG.debug("null value in Configuration after replacement for key={}. Skipping.", key); |
| } |
| } |
| |
| AMPluginDescriptorProto pluginDescriptorProto = |
| DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor); |
| builder.setAmPluginDescriptor(pluginDescriptorProto); |
| |
| return builder.build(); |
| } |
| |
| |
| |
| /** |
| * Helper function to create a YARN LocalResource |
| * @param fs FileSystem object |
| * @param p Path of resource to localize |
| * @param type LocalResource Type |
| * @return a YARN LocalResource for the given Path |
| * @throws IOException |
| */ |
| static LocalResource createLocalResource(FileSystem fs, Path p, |
| LocalResourceType type, |
| LocalResourceVisibility visibility) throws IOException { |
| LocalResource rsrc = Records.newRecord(LocalResource.class); |
| FileStatus rsrcStat = fs.getFileStatus(p); |
| rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs.resolvePath(rsrcStat |
| .getPath()))); |
| rsrc.setSize(rsrcStat.getLen()); |
| rsrc.setTimestamp(rsrcStat.getModificationTime()); |
| rsrc.setType(type); |
| rsrc.setVisibility(visibility); |
| return rsrc; |
| } |
| |
| private static Path localizeDagPlanAsText(DAGPlan dagPB, FileSystem fs, AMConfiguration amConfig, |
| String strAppId, Path tezSysStagingPath) throws IOException { |
| Path textPath = |
| TezCommonUtils.getTezTextPlanStagingPath(tezSysStagingPath, strAppId, dagPB.getName()); |
| FSDataOutputStream dagPBOutTextStream = null; |
| try { |
| dagPBOutTextStream = TezCommonUtils.createFileForAM(fs, textPath); |
| String dagPBStr = dagPB.toString(); |
| int dagPBStrLen = dagPBStr.length(); |
| if (dagPBStrLen <= UTF8_CHUNK_SIZE) { |
| dagPBOutTextStream.writeUTF(dagPBStr); |
| } else { |
| int startIndex = 0; |
| while (startIndex < dagPBStrLen) { |
| int endIndex = startIndex + UTF8_CHUNK_SIZE; |
| if (endIndex > dagPBStrLen) { |
| endIndex = dagPBStrLen; |
| } |
| dagPBOutTextStream.writeUTF(dagPBStr.substring(startIndex, endIndex)); |
| startIndex += UTF8_CHUNK_SIZE; |
| } |
| } |
| } finally { |
| if (dagPBOutTextStream != null) { |
| dagPBOutTextStream.close(); |
| } |
| } |
| return textPath; |
| } |
| |
| static DAGClientAMProtocolBlockingPB getAMProxy(FrameworkClient frameworkClient, |
| Configuration conf, ApplicationId applicationId, UserGroupInformation ugi) |
| throws TezException, IOException { |
| ApplicationReport appReport; |
| try { |
| appReport = frameworkClient.getApplicationReport(applicationId); |
| |
| if(appReport == null) { |
| throw new TezUncheckedException("Could not retrieve application report" |
| + " from YARN, applicationId=" + applicationId); |
| } |
| YarnApplicationState appState = appReport.getYarnApplicationState(); |
| if(appState != YarnApplicationState.RUNNING) { |
| if (appState == YarnApplicationState.FINISHED |
| || appState == YarnApplicationState.KILLED |
| || appState == YarnApplicationState.FAILED) { |
| String msg = "Application not running" |
| + ", applicationId=" + applicationId |
| + ", yarnApplicationState=" + appReport.getYarnApplicationState() |
| + ", finalApplicationStatus=" |
| + appReport.getFinalApplicationStatus() |
| + ", trackingUrl=" + appReport.getTrackingUrl() |
| + ", diagnostics=" |
| + (appReport.getDiagnostics() != null ? appReport.getDiagnostics() |
| : TezClient.NO_CLUSTER_DIAGNOSTICS_MSG); |
| LOG.info(msg); |
| throw new SessionNotRunning(msg); |
| } |
| return null; |
| } |
| } catch (ApplicationNotFoundException e) { |
| throw new SessionNotRunning(e); |
| } catch (YarnException e) { |
| throw new TezException(e); |
| } |
| |
| return getAMProxy(conf, appReport.getHost(), appReport.getRpcPort(), |
| appReport.getClientToAMToken(), ugi); |
| } |
| |
| @Private |
| public static DAGClientAMProtocolBlockingPB getAMProxy(final Configuration conf, String amHost, |
| int amRpcPort, org.apache.hadoop.yarn.api.records.Token clientToAMToken, |
| UserGroupInformation userUgi) throws IOException { |
| |
| final InetSocketAddress serviceAddr = NetUtils.createSocketAddrForHost(amHost, amRpcPort); |
| if (clientToAMToken != null) { |
| Token<ClientToAMTokenIdentifier> token = ConverterUtils.convertFromYarn(clientToAMToken, |
| serviceAddr); |
| userUgi.addToken(token); |
| } |
| LOG.debug("Connecting to Tez AM at {}", serviceAddr); |
| DAGClientAMProtocolBlockingPB proxy = null; |
| try { |
| proxy = userUgi.doAs(new PrivilegedExceptionAction<DAGClientAMProtocolBlockingPB>() { |
| @Override |
| public DAGClientAMProtocolBlockingPB run() throws IOException { |
| RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class, ProtobufRpcEngine.class); |
| return (DAGClientAMProtocolBlockingPB) RPC.getProxy(DAGClientAMProtocolBlockingPB.class, |
| 0, serviceAddr, conf); |
| } |
| }); |
| } catch (InterruptedException e) { |
| throw new IOException("Failed to connect to AM", e); |
| } |
| return proxy; |
| } |
| |
| static void createSessionToken(String tokenIdentifier, |
| JobTokenSecretManager jobTokenSecretManager, |
| Credentials credentials) { |
| JobTokenIdentifier identifier = new JobTokenIdentifier(new Text( |
| tokenIdentifier)); |
| Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier, |
| jobTokenSecretManager); |
| sessionToken.setService(identifier.getJobId()); |
| TokenCache.setSessionToken(sessionToken, credentials); |
| } |
| |
| @Private |
| /** |
| * Add computed Xmx value to java opts if both -Xms and -Xmx are not specified |
| * @param javaOpts Current java opts |
| * @param resource Resource capability based on which java opts will be computed |
| * @param maxHeapFactor Factor to size Xmx ( valid range is 0.0 < x < 1.0) |
| * @return Modified java opts with computed Xmx value |
| */ |
| public static String maybeAddDefaultMemoryJavaOpts(String javaOpts, Resource resource, |
| double maxHeapFactor) { |
| if ((javaOpts != null && !javaOpts.isEmpty() |
| && (javaOpts.contains("-Xmx") || javaOpts.contains("-Xms"))) |
| || (resource.getMemory() <= 0)) { |
| return javaOpts; |
| } |
| |
| if ((maxHeapFactor <= 0 && Double.valueOf("-1") != maxHeapFactor) || maxHeapFactor >= 1) { |
| return javaOpts; |
| } |
| |
| if (Double.valueOf("-1") == maxHeapFactor) { |
| maxHeapFactor = resource.getMemory() < TezConstants.TEZ_CONTAINER_SMALL_SLAB_BOUND_MB |
| ? TezConstants.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_SMALL_SLAB |
| : TezConstants.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_LARGE_SLAB; |
| } |
| int maxMemory = (int)(resource.getMemory() * maxHeapFactor); |
| maxMemory = maxMemory <= 0 ? 1 : maxMemory; |
| |
| return " -Xmx" + maxMemory + "m " |
| + ( javaOpts != null ? javaOpts : ""); |
| } |
| |
| private static boolean checkAncestorPermissionsForAllUsers(Configuration conf, Path pathComponent, |
| FsAction permission) throws IOException { |
| FileSystem fs = pathComponent.getFileSystem(conf); |
| |
| if (Shell.WINDOWS && fs instanceof LocalFileSystem) { |
| // Relax the requirement for public cache on LFS on Windows since default permissions are |
| // "700" all the way up to the drive letter. In this model, the only requirement for a user |
| // is to give EVERYONE group permission on the file and the file will be considered public. |
| // This code path is only hit when fs.default.name is file:/// (mainly in tests). |
| return true; |
| } |
| |
| if (fs.getFileStatus(pathComponent).isFile()) { |
| pathComponent = pathComponent.getParent(); |
| } |
| |
| while (pathComponent != null) { |
| if (!fs.getFileStatus(pathComponent).getPermission().getOtherAction().implies(permission)) { |
| return false; |
| } |
| pathComponent = pathComponent.getParent(); |
| } |
| |
| return true; |
| } |
| |
| @Private |
| @VisibleForTesting |
| static String constructAMLaunchOpts(TezConfiguration tezConf, Resource capability) { |
| String defaultOpts = tezConf.get(TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS, |
| TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT); |
| Path tmpDir = new Path(Environment.PWD.$(), |
| YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR); |
| String amOpts = "-Djava.io.tmpdir=" + tmpDir + " "; |
| |
| if (defaultOpts != null && !defaultOpts.isEmpty()) { |
| amOpts = amOpts + defaultOpts + " "; |
| } |
| amOpts = amOpts + tezConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, |
| TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT); |
| |
| amOpts = maybeAddDefaultMemoryJavaOpts(amOpts, capability, |
| tezConf.getDouble(TezConfiguration.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION, |
| TezConfiguration.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_DEFAULT)); |
| |
| return amOpts; |
| } |
| |
| private static void addVersionInfoToEnv(Map<String, String> environment, |
| TezApiVersionInfo versionInfo) { |
| if (!versionInfo.getVersion().equals(VersionInfo.UNKNOWN)) { |
| TezYARNUtils.replaceInEnv(environment, TezConstants.TEZ_CLIENT_VERSION_ENV, |
| versionInfo.getVersion()); |
| } |
| } |
| |
| @Private |
| public static void addLogParamsToEnv(Map<String, String> environment, String[] logParams) { |
| if (logParams.length == 2 && !Strings.isNullOrEmpty(logParams[1])) { |
| TezYARNUtils.replaceInEnv(environment, TezConstants.TEZ_CONTAINER_LOG_PARAMS, logParams[1]); |
| } |
| } |
| |
| @Private |
| public static String[] parseLogParams(String logConfig) { |
| if (!Strings.isNullOrEmpty(logConfig)) { |
| int separatorIndex = logConfig.indexOf(TezConstants.TEZ_CONTAINER_LOG_PARAMS_SEPARATOR); |
| if (separatorIndex == -1) { |
| return new String[]{logConfig.trim()}; |
| } else { |
| return new String[]{logConfig.substring(0, separatorIndex), |
| logConfig.substring(separatorIndex + 1, logConfig.length()).trim()}; |
| } |
| } else { |
| return null; |
| } |
| } |
| |
| @VisibleForTesting |
| // this method will set the app priority only if the priority config has been defined |
| public static void setApplicationPriority(ApplicationSubmissionContext context, |
| AMConfiguration amConfig) { |
| if (amConfig.getTezConfiguration().get(TezConfiguration.TEZ_AM_APPLICATION_PRIORITY) != null) { |
| // since we already checked not null condition, we are guaranteed that default value |
| // (0 in this case for getInt) will not be returned/used. |
| // The idea is to not use any default priority from TEZ side, if none provided in config; |
| // let YARN determine the priority of the submitted application |
| int priority = amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_APPLICATION_PRIORITY, 0); |
| context.setPriority(Priority.newInstance(priority)); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Settting TEZ application priority, applicationId= " + context.getApplicationId() + |
| ", priority= " + context.getPriority().getPriority()); |
| } |
| } |
| } |
| |
| |
| public static byte[] getLocalSha(Path path, Configuration conf) throws IOException { |
| InputStream is = null; |
| try { |
| is = FileSystem.getLocal(conf).open(path); |
| return DigestUtils.sha384(is); |
| } finally { |
| if (is != null) { |
| is.close(); |
| } |
| } |
| } |
| |
| public static byte[] getResourceSha(URI uri, Configuration conf) throws IOException { |
| InputStream is = null; |
| try { |
| is = FileSystem.get(uri, conf).open(new Path(uri)); |
| return DigestUtils.sha384(is); |
| } finally { |
| if (is != null) { |
| is.close(); |
| } |
| } |
| } |
| } |