(TWILL-225) Allow configurations overridable per TwillPreprer
- Also increased the vmen-pmen ration in TwillTester to avoid test failure due to reduced container size
- There is no way to disable vmen-pmen ratio check in Hadoop 2.0.
This closes #39 on Github.
Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
index fd568b3..43b751b 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
@@ -30,6 +30,14 @@
public interface TwillPreparer {
/**
+ * Overrides the default configuration with the given set of configurations.
+ *
+ * @param config set of configurations to override
+ * @return This {@link TwillPreparer}
+ */
+ TwillPreparer withConfiguration(Map<String, String> config);
+
+ /**
* Adds a {@link LogHandler} for receiving an application log.
* @param handler The {@link LogHandler}.
* @return This {@link TwillPreparer}.
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index de03a7a..2d1edd0 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -40,6 +40,7 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import joptsimple.OptionSpec;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -80,6 +81,7 @@
import org.apache.twill.internal.utils.Dependencies;
import org.apache.twill.internal.utils.Paths;
import org.apache.twill.internal.utils.Resources;
+import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory;
import org.apache.twill.internal.yarn.YarnAppClient;
import org.apache.twill.internal.yarn.YarnApplicationReport;
import org.apache.twill.internal.yarn.YarnUtils;
@@ -126,9 +128,8 @@
}
};
- private final YarnConfiguration yarnConfig;
+ private final Configuration config;
private final TwillSpecification twillSpec;
- private final YarnAppClient yarnAppClient;
private final String zkConnectString;
private final Location appLocation;
private final YarnTwillControllerFactory controllerFactory;
@@ -143,9 +144,6 @@
private final Map<String, Map<String, String>> environments = Maps.newHashMap();
private final List<String> applicationClassPaths = Lists.newArrayList();
private final Credentials credentials;
- private final int reservedMemory;
- private final double minHeapRatio;
- private final File localStagingDir;
private final Map<String, Map<String, String>> logLevels = Maps.newHashMap();
private final LocationCache locationCache;
private final Set<URL> twillClassPaths;
@@ -155,25 +153,16 @@
private ClassAcceptor classAcceptor;
private final Map<String, Integer> maxRetries = Maps.newHashMap();
- YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec, RunId runId,
- YarnAppClient yarnAppClient, String zkConnectString, Location appLocation, Set<URL> twillClassPaths,
+ YarnTwillPreparer(Configuration config, TwillSpecification twillSpec, RunId runId,
+ String zkConnectString, Location appLocation, Set<URL> twillClassPaths,
String extraOptions, LocationCache locationCache, YarnTwillControllerFactory controllerFactory) {
- this.yarnConfig = yarnConfig;
+ this.config = config;
this.twillSpec = twillSpec;
this.runId = runId;
- this.yarnAppClient = yarnAppClient;
this.zkConnectString = zkConnectString;
this.appLocation = appLocation;
this.controllerFactory = controllerFactory;
this.credentials = createCredentials();
- this.reservedMemory = yarnConfig.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB,
- Configs.Defaults.JAVA_RESERVED_MEMORY_MB);
- // doing this way to support hadoop-2.0 profile
- String minHeapRatioStr = yarnConfig.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO);
- this.minHeapRatio = (minHeapRatioStr == null) ?
- Configs.Defaults.HEAP_RESERVED_MIN_RATIO : Double.parseDouble(minHeapRatioStr);
- this.localStagingDir = new File(yarnConfig.get(Configs.Keys.LOCAL_STAGING_DIRECTORY,
- Configs.Defaults.LOCAL_STAGING_DIRECTORY));
this.extraOptions = extraOptions;
this.classAcceptor = new ClassAcceptor();
this.locationCache = locationCache;
@@ -187,6 +176,14 @@
}
@Override
+ public TwillPreparer withConfiguration(Map<String, String> config) {
+ for (Map.Entry<String, String> entry : config.entrySet()) {
+ this.config.set(entry.getKey(), entry.getValue());
+ }
+ return this;
+ }
+
+ @Override
public TwillPreparer addLogHandler(LogHandler handler) {
logHandlers.add(handler);
return this;
@@ -362,6 +359,7 @@
@Override
public TwillController start(long timeout, TimeUnit timeoutUnit) {
try {
+ final YarnAppClient yarnAppClient = new VersionDetectYarnAppClientFactory().create(config);
final ProcessLauncher<ApplicationMasterInfo> launcher = yarnAppClient.createLauncher(twillSpec, schedulerQueue);
final ApplicationMasterInfo appMasterInfo = launcher.getContainerInfo();
Callable<ProcessController<YarnApplicationReport>> submitTask =
@@ -373,11 +371,11 @@
Map<String, LocalFile> localFiles = Maps.newHashMap();
createLauncherJar(localFiles);
- createTwillJar(createBundler(classAcceptor), localFiles);
+ createTwillJar(createBundler(classAcceptor), yarnAppClient, localFiles);
createApplicationJar(createApplicationJarBundler(classAcceptor), localFiles);
createResourcesJar(createBundler(classAcceptor), localFiles);
- Path runtimeConfigDir = Files.createTempDirectory(localStagingDir.toPath(),
+ Path runtimeConfigDir = Files.createTempDirectory(getLocalStagingDir().toPath(),
Constants.Files.RUNTIME_CONFIG_JAR);
try {
saveSpecification(twillSpec, runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC));
@@ -401,12 +399,11 @@
// org.apache.twill.internal.appmaster.ApplicationMasterMain
// false
- int reservedMemoryMB = yarnConfig.getInt(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB,
- Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB);
- int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(),
- reservedMemoryMB,
- minHeapRatio);
- return launcher.prepareLaunch(ImmutableMap.<String, String>of(), localFiles.values(), credentials)
+ int reservedMemoryMB = config.getInt(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB,
+ Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB);
+ int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(), reservedMemoryMB, getMinHeapRatio());
+ return launcher.prepareLaunch(ImmutableMap.<String, String>of(), localFiles.values(),
+ createSubmissionCredentials())
.addCommand(
"$JAVA_HOME/bin/java",
"-Djava.io.tmpdir=tmp",
@@ -431,6 +428,29 @@
}
}
+ /**
+ * Returns the minimum heap ratio based on the configuration.
+ */
+ private double getMinHeapRatio() {
+ // doing this way to support hadoop-2.0 profile
+ String minHeapRatioStr = config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO);
+ return (minHeapRatioStr == null) ? Configs.Defaults.HEAP_RESERVED_MIN_RATIO : Double.parseDouble(minHeapRatioStr);
+ }
+
+ /**
+ * Returns the reserved memory size in MB based on the configuration.
+ */
+ private int getReservedMemory() {
+ return config.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB, Configs.Defaults.JAVA_RESERVED_MEMORY_MB);
+ }
+
+ /**
+ * Returns the local staging directory based on the configuration.
+ */
+ private File getLocalStagingDir() {
+ return new File(config.get(Configs.Keys.LOCAL_STAGING_DIRECTORY, Configs.Defaults.LOCAL_STAGING_DIRECTORY));
+ }
+
private void setEnv(String runnableName, Map<String, String> env, boolean overwrite) {
Map<String, String> environment = environments.get(runnableName);
if (environment == null) {
@@ -455,21 +475,40 @@
this.logLevels.put(runnableName, newLevels);
}
+ /**
+ * Creates an {@link Credentials} by copying the {@link Credentials} of the current user.
+ */
private Credentials createCredentials() {
Credentials credentials = new Credentials();
try {
credentials.addAll(UserGroupInformation.getCurrentUser().getCredentials());
+ } catch (IOException e) {
+ LOG.warn("Failed to get current user UGI. Current user credentials not added.", e);
+ }
+ return credentials;
+ }
- List<Token<?>> tokens = YarnUtils.addDelegationTokens(yarnConfig, appLocation.getLocationFactory(), credentials);
+ /**
+ * Creates a {@link Credentials} for the application submission.
+ */
+ private Credentials createSubmissionCredentials() {
+ Credentials credentials = new Credentials();
+ try {
+ // Acquires delegation token for the location
+ List<Token<?>> tokens = YarnUtils.addDelegationTokens(config, appLocation.getLocationFactory(), credentials);
if (LOG.isDebugEnabled()) {
for (Token<?> token : tokens) {
LOG.debug("Delegation token acquired for {}, {}", appLocation, token);
}
}
} catch (IOException e) {
- LOG.warn("Failed to check for secure login type. Not gathering any delegation token.", e);
+ LOG.warn("Failed to acquire delegation token for location {}", appLocation);
}
+
+ // Copy the user provided credentials.
+ // It will override the location delegation tokens acquired above if user supplies it.
+ credentials.addAll(this.credentials);
return credentials;
}
@@ -481,7 +520,9 @@
return new DefaultLocalFile(name, location.toURI(), location.lastModified(), location.length(), archive, null);
}
- private void createTwillJar(final ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
+ private void createTwillJar(final ApplicationBundler bundler,
+ final YarnAppClient yarnAppClient,
+ Map<String, LocalFile> localFiles) throws IOException {
LOG.debug("Create and copy {}", Constants.Files.TWILL_JAR);
Location location = locationCache.get(Constants.Files.TWILL_JAR, new LocationCache.Loader() {
@Override
@@ -633,8 +674,8 @@
TwillRuntimeSpecificationAdapter.create().toJson(
new TwillRuntimeSpecification(newTwillSpec, appLocation.getLocationFactory().getHomeLocation().getName(),
appLocation.toURI(), zkConnectString, runId, twillSpec.getName(),
- reservedMemory, yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
- logLevels, maxRetries, minHeapRatio), writer);
+ getReservedMemory(), config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
+ logLevels, maxRetries, getMinHeapRatio()), writer);
}
LOG.debug("Done {}", targetFile);
}
@@ -787,7 +828,7 @@
}
private ApplicationBundler createBundler(ClassAcceptor classAcceptor) {
- return new ApplicationBundler(classAcceptor).setTempDir(localStagingDir);
+ return new ApplicationBundler(classAcceptor).setTempDir(getLocalStagingDir());
}
/**
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 405eb24..d8e48de 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -126,7 +126,6 @@
};
private final YarnConfiguration yarnConfig;
- private final YarnAppClient yarnAppClient;
private final ZKClientService zkClientService;
private final LocationFactory locationFactory;
private final Table<String, RunId, YarnTwillController> controllers;
@@ -162,7 +161,6 @@
*/
public YarnTwillRunnerService(YarnConfiguration config, String zkConnect, LocationFactory locationFactory) {
this.yarnConfig = config;
- this.yarnAppClient = new VersionDetectYarnAppClientFactory().create(config);
this.locationFactory = locationFactory;
this.zkClientService = getZKClientService(zkConnect);
this.controllers = HashBasedTable.create();
@@ -288,8 +286,9 @@
locationCache = new NoCachingLocationCache(appLocation);
}
- return new YarnTwillPreparer(yarnConfig, twillSpec, runId, yarnAppClient,
- zkClientService.getConnectString(), appLocation, twillClassPaths, jvmOptions,
+ Configuration config = new Configuration(yarnConfig);
+ return new YarnTwillPreparer(config, twillSpec, runId, zkClientService.getConnectString(),
+ appLocation, twillClassPaths, jvmOptions,
locationCache, new YarnTwillControllerFactory() {
@Override
public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
@@ -596,6 +595,8 @@
synchronized (YarnTwillRunnerService.this) {
if (!controllers.contains(appName, runId)) {
ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName);
+ YarnAppClient yarnAppClient = new VersionDetectYarnAppClientFactory().create(new Configuration(yarnConfig));
+
YarnTwillController controller = listenController(
new YarnTwillController(appName, runId, zkClient, amLiveNodeData, yarnAppClient));
controllers.put(appName, runId, controller);
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
index c6f7b9a..f5143ce 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
@@ -20,6 +20,8 @@
import com.google.common.collect.ImmutableMap;
import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.Configs;
+import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.ResourceSpecification;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillController;
@@ -33,6 +35,7 @@
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
+import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -65,12 +68,19 @@
public void testMaxHeapSize() throws InterruptedException, TimeoutException, ExecutionException {
TwillRunner runner = getTwillRunner();
TwillController controller = runner.prepare(new MaxHeapApp())
+ // Alter the AM container size
+ .withConfiguration(Collections.singletonMap(Configs.Keys.YARN_AM_MEMORY_MB, "256"))
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
.start();
try {
ServiceDiscovered discovered = controller.discoverService("sleep");
Assert.assertTrue(waitForSize(discovered, 1, 120));
+
+ // Verify the AM container size
+ ResourceReport resourceReport = controller.getResourceReport();
+ Assert.assertNotNull(resourceReport);
+ Assert.assertEquals(256, resourceReport.getAppMasterResources().getMemoryMB());
} finally {
controller.terminate().get(120, TimeUnit.SECONDS);
}
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
index 9daf06c..a141176 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
@@ -138,7 +138,7 @@
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
conf.setBoolean("yarn.scheduler.include-port-in-node-name", true);
}
- conf.set("yarn.nodemanager.vmem-pmem-ratio", "20.1");
+ conf.set("yarn.nodemanager.vmem-pmem-ratio", "100.1");
conf.set("yarn.nodemanager.vmem-check-enabled", "false");
conf.set("yarn.scheduler.minimum-allocation-mb", "128");
conf.set("yarn.nodemanager.delete.debug-delay-sec", "3600");