blob: 581d7225a32aec0fff0648b638be417e096c54a8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.common.TezClassLoader;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.junit.Assert;
import org.junit.Test;
/**
*
*/
public class TestTezClientUtils {
private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ TestTezClientUtils.class.getName() + "-tmpDir";
/**
*
*/
@Test (timeout=5000)
public void validateSetTezJarLocalResourcesNotDefined() throws Exception {
TezConfiguration conf = new TezConfiguration(false);
Credentials credentials = new Credentials();
try {
Map<String,LocalResource> resources = new HashMap<String, LocalResource>();
TezClientUtils.setupTezJarsLocalResources(conf, credentials, resources);
Assert.fail("Expected TezUncheckedException");
} catch (TezUncheckedException e) {
Assert.assertTrue(e.getMessage().contains("Invalid configuration of tez jars"));
}
}
@Test (timeout=5000)
public void validateSetTezJarLocalResourcesDefinedButEmpty() throws Exception {
File emptyDir = new File(TEST_ROOT_DIR, "emptyDir");
emptyDir.deleteOnExit();
Assert.assertTrue(emptyDir.mkdirs());
TezConfiguration conf = new TezConfiguration();
conf.set(TezConfiguration.TEZ_LIB_URIS, emptyDir.toURI().toURL().toString());
Credentials credentials = new Credentials();
try {
Map<String,LocalResource> resources = new HashMap<String, LocalResource>();
TezClientUtils.setupTezJarsLocalResources(conf, credentials, resources);
Assert.fail("Expected TezUncheckedException");
} catch (TezUncheckedException e) {
Assert.assertTrue(e.getMessage().contains("No files found in locations"));
}
}
/**
*
*/
@Test(expected=FileNotFoundException.class, timeout=5000)
public void validateSetTezJarLocalResourcesDefinedNonExistingDirectory() throws Exception {
TezConfiguration conf = new TezConfiguration();
conf.set(TezConfiguration.TEZ_LIB_URIS, "file:///foo");
Credentials credentials = new Credentials();
Map<String,LocalResource> resources = new HashMap<String, LocalResource>();
TezClientUtils.setupTezJarsLocalResources(conf, credentials, resources);
}
/**
*
*/
@Test (timeout=20000)
public void validateSetTezJarLocalResourcesDefinedExistingDirectory() throws Exception {
URL[] cp = TezClassLoader.getInstance().getURLs();
StringBuffer buffer = new StringBuffer();
for (URL url : cp) {
buffer.append(url.toExternalForm());
buffer.append(",");
}
TezConfiguration conf = new TezConfiguration();
conf.set(TezConfiguration.TEZ_LIB_URIS, buffer.toString());
Credentials credentials = new Credentials();
Map<String, LocalResource> localizedMap = new HashMap<String, LocalResource>();
boolean usingArchive = TezClientUtils.setupTezJarsLocalResources(conf, credentials,
localizedMap);
Assert.assertFalse(usingArchive);
Set<String> resourceNames = localizedMap.keySet();
for (URL url : cp) {
File file = FileUtils.toFile(url);
if (file.isDirectory()){
String[] firList = file.list();
for (String fileNme : firList) {
File innerFile = new File(file, fileNme);
if (!innerFile.isDirectory()){
assertTrue(resourceNames.contains(innerFile.getName()));
}
// not supporting deep hierarchies
}
}
else {
assertTrue(resourceNames.contains(file.getName()));
}
}
}
/**
*
* @throws Exception
*/
@Test (timeout=5000)
public void validateSetTezJarLocalResourcesDefinedExistingDirectoryIgnored() throws Exception {
URL[] cp = TezClassLoader.getInstance().getURLs();
StringBuffer buffer = new StringBuffer();
for (URL url : cp) {
buffer.append(url.toExternalForm());
buffer.append(",");
}
TezConfiguration conf = new TezConfiguration();
conf.set(TezConfiguration.TEZ_LIB_URIS, buffer.toString());
conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
Credentials credentials = new Credentials();
Map<String, LocalResource> localizedMap = new HashMap<String, LocalResource>();
Assert.assertFalse(TezClientUtils.setupTezJarsLocalResources(conf, credentials, localizedMap));
assertTrue(localizedMap.isEmpty());
}
/**
*
* @throws Exception
*/
@Test (timeout=20000)
public void validateSetTezJarLocalResourcesDefinedExistingDirectoryIgnoredSetToFalse() throws Exception {
URL[] cp = TezClassLoader.getInstance().getURLs();
StringBuffer buffer = new StringBuffer();
for (URL url : cp) {
buffer.append(url.toExternalForm());
buffer.append(",");
}
TezConfiguration conf = new TezConfiguration();
conf.set(TezConfiguration.TEZ_LIB_URIS, buffer.toString());
conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, false);
Credentials credentials = new Credentials();
Map<String, LocalResource> localizedMap = new HashMap<String, LocalResource>();
Assert.assertFalse(TezClientUtils.setupTezJarsLocalResources(conf, credentials, localizedMap));
assertFalse(localizedMap.isEmpty());
}
/**
*
*/
@Test (timeout=5000)
public void testTezDefaultFS() throws Exception {
FileSystem localFs = FileSystem.getLocal(new Configuration());
StringBuilder tezLibUris = new StringBuilder();
Path topDir = new Path(TEST_ROOT_DIR, "testTezDefaultFS");
if (localFs.exists(topDir)) {
localFs.delete(topDir, true);
}
localFs.mkdirs(topDir);
Path file = new Path(topDir, "file.jar");
Assert.assertTrue(localFs.createNewFile(file));
tezLibUris.append(localFs.makeQualified(file).toString());
TezConfiguration conf = new TezConfiguration();
conf.set(TezConfiguration.TEZ_LIB_URIS, tezLibUris.toString());
conf.set("fs.defaultFS", "hdfs:///localhost:1234");
Credentials credentials = new Credentials();
Map<String, LocalResource> localizedMap = new HashMap<String, LocalResource>();
TezClientUtils.setupTezJarsLocalResources(conf, credentials, localizedMap);
Assert.assertTrue(localFs.delete(file, true));
Assert.assertTrue(localFs.delete(topDir, true));
}
/**
*
*/
@Test (timeout=5000)
public void validateSetTezJarLocalResourcesMultipleTarballs() throws Exception {
FileSystem localFs = FileSystem.getLocal(new Configuration());
StringBuilder tezLibUris = new StringBuilder();
// Create 2 files
Path topDir = new Path(TEST_ROOT_DIR, "validatemultipletarballs");
if (localFs.exists(topDir)) {
localFs.delete(topDir, true);
}
localFs.mkdirs(topDir);
Path tarFile1 = new Path(topDir, "f1.tar.gz");
Path tarFile2 = new Path(topDir, "f2.tar.gz");
Assert.assertTrue(localFs.createNewFile(tarFile1));
Assert.assertTrue(localFs.createNewFile(tarFile2));
tezLibUris.append(localFs.makeQualified(tarFile1).toString()).append("#tar1").append(",");
tezLibUris.append(localFs.makeQualified(tarFile2).toString()).append("#tar2").append(",");
TezConfiguration conf = new TezConfiguration();
conf.set(TezConfiguration.TEZ_LIB_URIS, tezLibUris.toString());
Credentials credentials = new Credentials();
Map<String, LocalResource> localizedMap = new HashMap<String, LocalResource>();
TezClientUtils.setupTezJarsLocalResources(conf, credentials, localizedMap);
Set<String> resourceNames = localizedMap.keySet();
Assert.assertEquals(2, resourceNames.size());
Assert.assertTrue(resourceNames.contains("tar1"));
Assert.assertTrue(resourceNames.contains("tar2"));
Assert.assertFalse(resourceNames.contains("f1.tar.gz"));
Assert.assertFalse(resourceNames.contains("f2.tar.gz"));
Assert.assertTrue(localFs.delete(tarFile1, true));
Assert.assertTrue(localFs.delete(tarFile2, true));
Assert.assertTrue(localFs.delete(topDir, true));
}
/**
*
*/
@Test (timeout=5000)
public void validateSetTezJarLocalResourcesMixTarballAndJar() throws Exception {
FileSystem localFs = FileSystem.getLocal(new Configuration());
StringBuilder tezLibUris = new StringBuilder();
// Create 2 jars and 1 archive
Path topDir = new Path(TEST_ROOT_DIR, "validatetarballandjar");
if (localFs.exists(topDir)) {
localFs.delete(topDir, true);
}
localFs.mkdirs(topDir);
Path tarFile1 = new Path(topDir, "f1.tar.gz");
Path jarFile2 = new Path(topDir, "f2.jar");
Path jarFile3 = new Path(topDir, "f3.jar");
Assert.assertTrue(localFs.createNewFile(tarFile1));
Assert.assertTrue(localFs.createNewFile(jarFile2));
Assert.assertTrue(localFs.createNewFile(jarFile3));
tezLibUris.append(localFs.makeQualified(topDir).toString()).append(",");
tezLibUris.append(localFs.makeQualified(tarFile1).toString()).append("#tar1").append(",");
TezConfiguration conf = new TezConfiguration();
conf.set(TezConfiguration.TEZ_LIB_URIS, tezLibUris.toString());
Credentials credentials = new Credentials();
Map<String, LocalResource> localizedMap = new HashMap<String, LocalResource>();
TezClientUtils.setupTezJarsLocalResources(conf, credentials, localizedMap);
Set<String> resourceNames = localizedMap.keySet();
Assert.assertEquals(4, resourceNames.size());
Assert.assertTrue(resourceNames.contains("tar1"));
Assert.assertTrue(resourceNames.contains("f1.tar.gz"));
Assert.assertTrue(resourceNames.contains("f2.jar"));
Assert.assertTrue(resourceNames.contains("f3.jar"));
Assert.assertTrue(localFs.delete(tarFile1, true));
Assert.assertTrue(localFs.delete(jarFile2, true));
Assert.assertTrue(localFs.delete(jarFile3, true));
Assert.assertTrue(localFs.delete(topDir, true));
}
@Test(timeout = 2000)
// this test checks if the priority field is set properly in the
// ApplicationSubmissionContext
public void testAppSubmissionContextForPriority() throws Exception {
TezConfiguration tezConf = new TezConfiguration();
int testpriority = 999;
ApplicationId appId = ApplicationId.newInstance(1000, 1);
Credentials credentials = new Credentials();
TezClientUtils.createSessionToken(appId.toString(),
new JobTokenSecretManager(), credentials);
tezConf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
Map<String, LocalResource> m = new HashMap<String, LocalResource>();
tezConf.setInt(TezConfiguration.TEZ_AM_APPLICATION_PRIORITY, testpriority);
AMConfiguration amConf =
new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), credentials);
ApplicationSubmissionContext appcontext;
appcontext = TezClientUtils.createApplicationSubmissionContext(
appId, null, "dagname",
amConf, m,
credentials, false,
new TezApiVersionInfo(), null, null);
assertEquals(testpriority, appcontext.getPriority().getPriority());
}
@Test(timeout=1000)
// when tez config property for app priority not set,
// tez should not set app priority and let YARN deal with it.
public void testSetApplicationPriority() {
TezConfiguration conf = new TezConfiguration(false);
AMConfiguration amconfig = new AMConfiguration(conf, null, null);
ApplicationSubmissionContext appContext = Records
.newRecord(ApplicationSubmissionContext.class);
TezClientUtils.setApplicationPriority(appContext, amconfig);
assertNull(appContext.getPriority());
}
@Test(timeout=1000)
public void testSetApplicationTags() {
TezConfiguration conf = new TezConfiguration(false);
conf.set(TezConfiguration.TEZ_APPLICATION_TAGS, "foo,bar");
AMConfiguration amconfig = new AMConfiguration(conf, null, null);
ApplicationSubmissionContext appContext = Records
.newRecord(ApplicationSubmissionContext.class);
Collection<String> tagsFromConf =
amconfig.getTezConfiguration().getTrimmedStringCollection(
TezConfiguration.TEZ_APPLICATION_TAGS);
appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
assertTrue(appContext.getApplicationTags().contains("foo"));
assertTrue(appContext.getApplicationTags().contains("bar"));
}
@Test(timeout = 5000)
public void testSessionTokenInAmClc() throws IOException, YarnException {
TezConfiguration tezConf = new TezConfiguration();
ApplicationId appId = ApplicationId.newInstance(1000, 1);
DAG dag = DAG.create("testdag");
dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
.setTaskLaunchCmdOpts("initialLaunchOpts"));
Credentials credentials = new Credentials();
JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
assertNotNull(jobToken);
AMConfiguration amConf =
new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), credentials);
ApplicationSubmissionContext appSubmissionContext =
TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
null, null);
ContainerLaunchContext amClc = appSubmissionContext.getAMContainerSpec();
Map<String, ByteBuffer> amServiceData = amClc.getServiceData();
assertNotNull(amServiceData);
assertEquals(1, amServiceData.size());
DataInputByteBuffer dibb = new DataInputByteBuffer();
dibb.reset(amServiceData.values().iterator().next());
Token<JobTokenIdentifier> jtSent = new Token<JobTokenIdentifier>();
jtSent.readFields(dibb);
assertTrue(Arrays.equals(jobToken.getIdentifier(), jtSent.getIdentifier()));
}
@Test(timeout = 5000)
public void testAMLoggingOptsSimple() throws IOException, YarnException {
TezConfiguration tezConf = new TezConfiguration();
tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "WARN");
ApplicationId appId = ApplicationId.newInstance(1000, 1);
Credentials credentials = new Credentials();
JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
DAG dag = DAG.create("testdag");
dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
.setTaskLaunchCmdOpts("initialLaunchOpts"));
AMConfiguration amConf =
new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), credentials);
ApplicationSubmissionContext appSubmissionContext =
TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
null, null);
List<String> expectedCommands = new LinkedList<String>();
expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator");
expectedCommands.add("-Dlog4j.configuration=" + TezConstants.TEZ_CONTAINER_LOG4J_PROPERTIES_FILE);
expectedCommands.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" +
ApplicationConstants.LOG_DIR_EXPANSION_VAR);
expectedCommands.add("-D" + TezConstants.TEZ_ROOT_LOGGER_NAME + "=" + "WARN" + "," +
TezConstants.TEZ_CONTAINER_LOGGER_NAME);
List<String> commands = appSubmissionContext.getAMContainerSpec().getCommands();
assertEquals(1, commands.size());
for (String expectedCmd : expectedCommands) {
assertTrue(commands.get(0).contains(expectedCmd));
}
Map<String, String> environment = appSubmissionContext.getAMContainerSpec().getEnvironment();
String logEnv = environment.get(TezConstants.TEZ_CONTAINER_LOG_PARAMS);
assertNull(logEnv);
}
@Test(timeout = 5000)
public void testAMLoggingOptsPerLogger() throws IOException, YarnException {
TezConfiguration tezConf = new TezConfiguration();
tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL,
"WARN;org.apache.hadoop.ipc=DEBUG;org.apache.hadoop.security=DEBUG");
ApplicationId appId = ApplicationId.newInstance(1000, 1);
Credentials credentials = new Credentials();
JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
DAG dag = DAG.create("testdag");
dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
.setTaskLaunchCmdOpts("initialLaunchOpts"));
AMConfiguration amConf =
new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), credentials);
ApplicationSubmissionContext appSubmissionContext =
TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
null, null);
List<String> expectedCommands = new LinkedList<String>();
expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator");
expectedCommands.add(
"-Dlog4j.configuration=" + TezConstants.TEZ_CONTAINER_LOG4J_PROPERTIES_FILE);
expectedCommands.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" +
ApplicationConstants.LOG_DIR_EXPANSION_VAR);
expectedCommands.add("-D" + TezConstants.TEZ_ROOT_LOGGER_NAME + "=" + "WARN" + "," +
TezConstants.TEZ_CONTAINER_LOGGER_NAME);
List<String> commands = appSubmissionContext.getAMContainerSpec().getCommands();
assertEquals(1, commands.size());
for (String expectedCmd : expectedCommands) {
assertTrue(commands.get(0).contains(expectedCmd));
}
Map<String, String> environment = appSubmissionContext.getAMContainerSpec().getEnvironment();
String logEnv = environment.get(TezConstants.TEZ_CONTAINER_LOG_PARAMS);
assertEquals("org.apache.hadoop.ipc=DEBUG;org.apache.hadoop.security=DEBUG", logEnv);
}
@Test(timeout = 5000)
public void testAMCommandOpts() {
Path tmpDir = new Path(Environment.PWD.$(),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
String tmpOpts = "-Djava.io.tmpdir=" + tmpDir;
TezConfiguration tezConf = new TezConfiguration();
String amCommandOpts = "-Xmx 200m -Dtest.property";
tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, amCommandOpts);
// Test1: Rely on defaults for cluster-default opts
String amOptsConstructed =
TezClientUtils.constructAMLaunchOpts(tezConf, Resource.newInstance(1024, 1));
assertEquals(tmpOpts + " "
+ TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT + " "
+ amCommandOpts,
amOptsConstructed);
// Test2: Setup cluster-default command opts explicitly
String clusterDefaultCommandOpts =
"-server -Djava.net.preferIPv4Stack=true -XX:+PrintGCDetails -verbose:gc ";
tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS, clusterDefaultCommandOpts);
amOptsConstructed =
TezClientUtils.constructAMLaunchOpts(tezConf, Resource.newInstance(1024, 1));
assertEquals(tmpOpts + " " + clusterDefaultCommandOpts + " " + amCommandOpts, amOptsConstructed);
// Test3: Don't setup Xmx explicitly
final double factor = 0.8;
amCommandOpts = "-Dtest.property";
tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, amCommandOpts);
amOptsConstructed =
TezClientUtils.constructAMLaunchOpts(tezConf, Resource.newInstance(1024, 1));
// It's OK for the Xmx value to show up before cluster default options, since Xmx will not be replaced if it already exists.
assertEquals(
" -Xmx" + ((int) (1024 * factor)) + "m" + " " + tmpOpts + " " + clusterDefaultCommandOpts + " " +
amCommandOpts,
amOptsConstructed);
// Test4: Ensure admin options with Xmx does not cause them to be overridden. This should almost never be done though.
clusterDefaultCommandOpts =
"-server -Djava.net.preferIPv4Stack=true -XX:+PrintGCDetails -verbose:gc -Xmx200m";
tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS, clusterDefaultCommandOpts);
amOptsConstructed =
TezClientUtils.constructAMLaunchOpts(tezConf, Resource.newInstance(1024, 1));
assertEquals(tmpOpts + " " + clusterDefaultCommandOpts + " " + amCommandOpts, amOptsConstructed);
}
@Test(timeout = 5000)
public void testTaskCommandOpts() throws TezException {
TezConfiguration tezConf = new TezConfiguration();
String taskCommandOpts = "-Xmx 200m -Dtest.property";
tezConf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, taskCommandOpts);
String expected = null;
// Test1: Rely on defaults for cluster default opts
String taskOptsConstructed = TezClientUtils.addDefaultsToTaskLaunchCmdOpts("", tezConf);
expected =
TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT + " " + taskCommandOpts;
assertTrue(
"Did not find Expected prefix: [" + expected + "] in string [" + taskOptsConstructed +
"]", taskOptsConstructed.startsWith(expected));
// Test2: Setup cluster-default command opts explicitly
String taskClusterDefaultCommandOpts =
"-server -Djava.net.preferIPv4Stack=true -XX:+PrintGCDetails -verbose:gc ";
tezConf.set(TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS,
taskClusterDefaultCommandOpts);
taskOptsConstructed =
TezClientUtils.addDefaultsToTaskLaunchCmdOpts("", tezConf);
expected = taskClusterDefaultCommandOpts + " " + taskCommandOpts;
assertTrue(
"Did not find Expected prefix: [" + expected + "] in string [" + taskOptsConstructed +
"]", taskOptsConstructed.startsWith(expected));
// Test3: Don't setup Xmx explicitly
taskCommandOpts = "-Dtest.property";
tezConf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, taskCommandOpts);
taskOptsConstructed =
TezClientUtils.addDefaultsToTaskLaunchCmdOpts("", tezConf);
expected = taskClusterDefaultCommandOpts + " " + taskCommandOpts;
assertTrue(
"Did not find Expected prefix: [" + expected + "] in string [" + taskOptsConstructed +
"]", taskOptsConstructed.startsWith(expected));
// Test4: Pass in a dag-configured value.
String programmaticTaskOpts = "-Dset.programatically=true -Djava.net.preferIPv4Stack=false";
taskOptsConstructed =
TezClientUtils.addDefaultsToTaskLaunchCmdOpts(programmaticTaskOpts, tezConf);
// Container logging is always added at the end, if it's required.
expected = taskClusterDefaultCommandOpts + " " + taskCommandOpts + " " + programmaticTaskOpts;
assertTrue(
"Did not find Expected prefix: [" + expected + "] in string [" + taskOptsConstructed +
"]", taskOptsConstructed.startsWith(expected));
}
@Test (timeout=5000)
public void testDefaultMemoryJavaOpts() {
final double factor = 0.8;
String origJavaOpts = "-Xmx";
String javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
Resource.newInstance(1000, 1), factor);
Assert.assertEquals(origJavaOpts, javaOpts);
origJavaOpts = "";
javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
Resource.newInstance(1000, 1), factor);
Assert.assertTrue(javaOpts.contains("-Xmx800m"));
origJavaOpts = "";
javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
Resource.newInstance(1, 1), factor);
Assert.assertTrue(javaOpts.contains("-Xmx1m"));
origJavaOpts = "";
javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
Resource.newInstance(-1, 1), factor);
Assert.assertEquals(origJavaOpts, javaOpts);
origJavaOpts = "";
javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
Resource.newInstance(355, 1), factor);
Assert.assertTrue(javaOpts.contains("-Xmx284m"));
origJavaOpts = " -Xms100m ";
javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
Resource.newInstance(355, 1), factor);
Assert.assertFalse(javaOpts.contains("-Xmx284m"));
Assert.assertTrue(javaOpts.contains("-Xms100m"));
origJavaOpts = "";
javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
Resource.newInstance(355, 1), 0);
Assert.assertEquals(origJavaOpts, javaOpts);
origJavaOpts = "";
javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
Resource.newInstance(355, 1), 100);
Assert.assertEquals(origJavaOpts, javaOpts);
origJavaOpts = "";
javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
Resource.newInstance(1000, 1), -1);
Assert.assertTrue(javaOpts.contains("-Xmx700m"));
origJavaOpts = "";
javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
Resource.newInstance(5000, 1), -1);
Assert.assertTrue(javaOpts.contains("-Xmx4000m"));
}
@Test (timeout=5000)
public void testDefaultLoggingJavaOpts() {
String origJavaOpts = null;
String javaOpts = TezClientUtils.maybeAddDefaultLoggingJavaOpts("FOOBAR", origJavaOpts);
Assert.assertNotNull(javaOpts);
Assert.assertTrue(javaOpts.contains("-D" + TezConstants.TEZ_ROOT_LOGGER_NAME + "=FOOBAR")
&& javaOpts.contains(TezConstants.TEZ_CONTAINER_LOG4J_PROPERTIES_FILE)
&&
javaOpts.contains("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator"));
}
@Test (timeout = 5000)
public void testConfSerializationForAm() {
Configuration conf =new Configuration(false);
String val1 = "fixedProperty";
String val2 = "parametrizedProperty/${user.name}";
String expVal2 = "parametrizedProperty/" + System.getProperty("user.name");
conf.set("property1", val1);
conf.set("property2", val2);
Map<String, String> expected = new HashMap<String, String>();
expected.put("property1", val1);
expected.put("property2", expVal2);
ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null);
for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) {
String v = expected.remove(kvPair.getKey());
assertEquals(v, kvPair.getValue());
}
assertTrue(expected.isEmpty());
}
// To run this test case see TestTezCommonUtils::testLocalResourceVisibility
// We do not have much control over the directory structure, cannot mock as the functions are
// static and do not want to spin up a minitez cluster just for this.
public static void testLocalResourceVisibility(DistributedFileSystem remoteFs, Configuration conf)
throws Exception {
Path topLevelDir = null;
try {
FsPermission publicDirPerms = new FsPermission((short) 0755); // rwxr-xr-x
FsPermission privateDirPerms = new FsPermission((short) 0754); // rwxr-xr--
FsPermission publicFilePerms = new FsPermission((short) 0554); // r-xr-xr--
FsPermission privateFilePerms = new FsPermission((short) 0550); // r-xr-x---
String fsURI = remoteFs.getUri().toString();
topLevelDir = new Path(fsURI, "/testLRVisibility");
Assert.assertTrue(remoteFs.mkdirs(topLevelDir, publicDirPerms));
Path publicSubDir = new Path(topLevelDir, "public_sub_dir");
Assert.assertTrue(remoteFs.mkdirs(publicSubDir, publicDirPerms));
Path privateSubDir = new Path(topLevelDir, "private_sub_dir");
Assert.assertTrue(remoteFs.mkdirs(privateSubDir, privateDirPerms));
Path publicFile = new Path(publicSubDir, "public_file");
Assert.assertTrue(remoteFs.createNewFile(publicFile));
remoteFs.setPermission(publicFile, publicFilePerms);
Path privateFile = new Path(publicSubDir, "private_file");
Assert.assertTrue(remoteFs.createNewFile(privateFile));
remoteFs.setPermission(privateFile, privateFilePerms);
Path publicFileInPrivateSubdir = new Path(privateSubDir, "public_file_in_private_subdir");
Assert.assertTrue(remoteFs.createNewFile(publicFileInPrivateSubdir));
remoteFs.setPermission(publicFileInPrivateSubdir, publicFilePerms);
TezConfiguration tezConf = new TezConfiguration(conf);
String tmpTezLibUris = String.format("%s,%s,%s,%s", topLevelDir, publicSubDir, privateSubDir,
conf.get(TezConfiguration.TEZ_LIB_URIS, ""));
tezConf.set(TezConfiguration.TEZ_LIB_URIS, tmpTezLibUris);
Map<String, LocalResource> lrMap = new HashMap<String, LocalResource>();
TezClientUtils.setupTezJarsLocalResources(tezConf, new Credentials(), lrMap);
Assert.assertEquals(publicFile.getName(), LocalResourceVisibility.PUBLIC,
lrMap.get(publicFile.getName()).getVisibility());
Assert.assertEquals(privateFile.getName(), LocalResourceVisibility.PRIVATE,
lrMap.get(privateFile.getName()).getVisibility());
Assert.assertEquals(publicFileInPrivateSubdir.getName(), LocalResourceVisibility.PRIVATE,
lrMap.get(publicFileInPrivateSubdir.getName()).getVisibility());
// test tar.gz
tezConf = new TezConfiguration(conf);
Path tarFile = new Path(topLevelDir, "foo.tar.gz");
Assert.assertTrue(remoteFs.createNewFile(tarFile));
//public
remoteFs.setPermission(tarFile, publicFilePerms);
tezConf.set(TezConfiguration.TEZ_LIB_URIS, tarFile.toString());
lrMap.clear();
Assert.assertTrue(
TezClientUtils.setupTezJarsLocalResources(tezConf, new Credentials(), lrMap));
Assert.assertEquals(LocalResourceVisibility.PUBLIC,
lrMap.get(TezConstants.TEZ_TAR_LR_NAME).getVisibility());
//private
remoteFs.setPermission(tarFile, privateFilePerms);
lrMap.clear();
TezClientUtils.setupTezJarsLocalResources(tezConf, new Credentials(), lrMap);
Assert.assertEquals(LocalResourceVisibility.PRIVATE,
lrMap.get(TezConstants.TEZ_TAR_LR_NAME).getVisibility());
} finally {
if (topLevelDir != null) {
remoteFs.delete(topLevelDir, true);
}
}
}
@Test(timeout = 5000)
public void testConfigurationAllowAll() {
Configuration srcConf = new Configuration(false);
Map<String, String> confMap = new HashMap<String, String>();
confMap.put("foo.property", "2000");
confMap.put("tez.property", "tezProperty");
confMap.put("yarn.property", "yarnProperty");
for (Map.Entry<String, String> entry : confMap.entrySet()) {
srcConf.set(entry.getKey(), entry.getValue());
}
ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(srcConf, null);
for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) {
String val = confMap.remove(kvPair.getKey());
assertNotNull(val);
assertEquals(val, kvPair.getValue());
}
assertTrue(confMap.isEmpty());
}
private Path createFile(FileSystem fs, Path dir, String fileName) throws IOException {
Path f1 = new Path(dir, fileName);
FSDataOutputStream outputStream = fs.create(f1, true);
outputStream.write(1);
outputStream.close();
return fs.makeQualified(f1);
}
@Test (timeout=5000)
public void validateSetTezAuxLocalResourcesWithFilesAndFolders() throws Exception {
FileSystem localFs = FileSystem.getLocal(new Configuration());
List<String> resources = new ArrayList<String>();
StringBuilder auxUriStr = new StringBuilder();
// Create 2 files
Path topDir = new Path(TEST_ROOT_DIR, "validateauxwithfiles");
if (localFs.exists(topDir)) {
localFs.delete(topDir, true);
}
localFs.mkdirs(topDir);
resources.add(createFile(localFs, topDir, "f1.txt").toString());
auxUriStr.append(localFs.makeQualified(topDir).toString()).append(",");
Path dir2 = new Path(topDir, "dir2");
localFs.mkdirs(dir2);
Path nestedDir = new Path(dir2, "nestedDir");
localFs.mkdirs(nestedDir);
createFile(localFs, nestedDir, "nested-f.txt");
resources.add(createFile(localFs, dir2, "dir2-f.txt").toString());
auxUriStr.append(localFs.makeQualified(dir2).toString()).append(",");
Path dir3 = new Path(topDir, "dir3");
localFs.mkdirs(dir3);
auxUriStr.append(localFs.makeQualified(dir3).toString()).append(",");
TezConfiguration conf = new TezConfiguration();
conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
conf.set(TezConfiguration.TEZ_AUX_URIS, auxUriStr.toString());
Credentials credentials = new Credentials();
Map<String, LocalResource> localizedMap = new HashMap<String, LocalResource>();
TezClientUtils.setupTezJarsLocalResources(conf, credentials, localizedMap);
Set<String> resourceNames = localizedMap.keySet();
Assert.assertEquals(2, resourceNames.size());
Assert.assertTrue(resourceNames.contains("f1.txt"));
Assert.assertTrue(resourceNames.contains("dir2-f.txt"));
}
@Test(timeout = 5000)
public void testServiceDescriptorSerializationForAM() {
Configuration conf = new Configuration(false);
ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf,
servicePluginsDescriptor);
assertTrue(confProto.hasAmPluginDescriptor());
assertTrue(confProto.getAmPluginDescriptor().getUberEnabled());
}
@Test(timeout = 5000)
public void testTaskLaunchCmdOptsSetup() throws TezException {
Configuration conf = new Configuration(false);
String vOpts = "";
String opts = TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vOpts, conf);
Assert.assertEquals(opts,
TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT + " "
+ TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS_DEFAULT + " " + vOpts);
vOpts = "foo";
opts = TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vOpts, conf);
Assert.assertEquals(opts,
TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT + " " + vOpts);
String taskOpts = "taskOpts";
conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, taskOpts);
opts = TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vOpts, conf);
Assert.assertEquals(opts,
TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT
+ " " + taskOpts + " " + vOpts);
}
@Test(timeout = 5000)
public void testClusterTaskLaunchCmdOptsSetup() throws TezException {
Configuration conf = new Configuration(false);
String adminOpts = "adminOpts";
conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS, adminOpts);
String vOpts = "";
String opts = TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vOpts, conf);
Assert.assertEquals(opts,
adminOpts + " "
+ TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS_DEFAULT + " " + vOpts);
vOpts = "foo";
opts = TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vOpts, conf);
Assert.assertEquals(opts, adminOpts + " " + vOpts);
String taskOpts = "taskOpts";
conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, taskOpts);
opts = TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vOpts, conf);
Assert.assertEquals(opts, adminOpts + " " + taskOpts + " " + vOpts);
}
}