(TWILL-225) Allow configurations overridable per TwillPreprer

- Also increased the vmen-pmen ration in TwillTester to avoid test failure due to reduced container size
  - There is no way to disable vmen-pmen ratio check in Hadoop 2.0.

This closes #39 on Github.

Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
index fd568b3..43b751b 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
@@ -30,6 +30,14 @@
 public interface TwillPreparer {
 
   /**
+   * Overrides the default configuration with the given set of configurations.
+   *
+   * @param config set of configurations to override
+   * @return This {@link TwillPreparer}
+   */
+  TwillPreparer withConfiguration(Map<String, String> config);
+
+  /**
    * Adds a {@link LogHandler} for receiving an application log.
    * @param handler The {@link LogHandler}.
    * @return This {@link TwillPreparer}.
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index de03a7a..2d1edd0 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -40,6 +40,7 @@
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import joptsimple.OptionSpec;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -80,6 +81,7 @@
 import org.apache.twill.internal.utils.Dependencies;
 import org.apache.twill.internal.utils.Paths;
 import org.apache.twill.internal.utils.Resources;
+import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory;
 import org.apache.twill.internal.yarn.YarnAppClient;
 import org.apache.twill.internal.yarn.YarnApplicationReport;
 import org.apache.twill.internal.yarn.YarnUtils;
@@ -126,9 +128,8 @@
     }
   };
 
-  private final YarnConfiguration yarnConfig;
+  private final Configuration config;
   private final TwillSpecification twillSpec;
-  private final YarnAppClient yarnAppClient;
   private final String zkConnectString;
   private final Location appLocation;
   private final YarnTwillControllerFactory controllerFactory;
@@ -143,9 +144,6 @@
   private final Map<String, Map<String, String>> environments = Maps.newHashMap();
   private final List<String> applicationClassPaths = Lists.newArrayList();
   private final Credentials credentials;
-  private final int reservedMemory;
-  private final double minHeapRatio;
-  private final File localStagingDir;
   private final Map<String, Map<String, String>> logLevels = Maps.newHashMap();
   private final LocationCache locationCache;
   private final Set<URL> twillClassPaths;
@@ -155,25 +153,16 @@
   private ClassAcceptor classAcceptor;
   private final Map<String, Integer> maxRetries = Maps.newHashMap();
 
-  YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec, RunId runId,
-                    YarnAppClient yarnAppClient, String zkConnectString, Location appLocation, Set<URL> twillClassPaths,
+  YarnTwillPreparer(Configuration config, TwillSpecification twillSpec, RunId runId,
+                    String zkConnectString, Location appLocation, Set<URL> twillClassPaths,
                     String extraOptions, LocationCache locationCache, YarnTwillControllerFactory controllerFactory) {
-    this.yarnConfig = yarnConfig;
+    this.config = config;
     this.twillSpec = twillSpec;
     this.runId = runId;
-    this.yarnAppClient = yarnAppClient;
     this.zkConnectString = zkConnectString;
     this.appLocation = appLocation;
     this.controllerFactory = controllerFactory;
     this.credentials = createCredentials();
-    this.reservedMemory = yarnConfig.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB,
-                                            Configs.Defaults.JAVA_RESERVED_MEMORY_MB);
-    // doing this way to support hadoop-2.0 profile
-    String minHeapRatioStr = yarnConfig.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO);
-    this.minHeapRatio = (minHeapRatioStr == null) ?
-            Configs.Defaults.HEAP_RESERVED_MIN_RATIO : Double.parseDouble(minHeapRatioStr);
-    this.localStagingDir = new File(yarnConfig.get(Configs.Keys.LOCAL_STAGING_DIRECTORY,
-                                                   Configs.Defaults.LOCAL_STAGING_DIRECTORY));
     this.extraOptions = extraOptions;
     this.classAcceptor = new ClassAcceptor();
     this.locationCache = locationCache;
@@ -187,6 +176,14 @@
   }
 
   @Override
+  public TwillPreparer withConfiguration(Map<String, String> config) {
+    for (Map.Entry<String, String> entry : config.entrySet()) {
+      this.config.set(entry.getKey(), entry.getValue());
+    }
+    return this;
+  }
+
+  @Override
   public TwillPreparer addLogHandler(LogHandler handler) {
     logHandlers.add(handler);
     return this;
@@ -362,6 +359,7 @@
   @Override
   public TwillController start(long timeout, TimeUnit timeoutUnit) {
     try {
+      final YarnAppClient yarnAppClient = new VersionDetectYarnAppClientFactory().create(config);
       final ProcessLauncher<ApplicationMasterInfo> launcher = yarnAppClient.createLauncher(twillSpec, schedulerQueue);
       final ApplicationMasterInfo appMasterInfo = launcher.getContainerInfo();
       Callable<ProcessController<YarnApplicationReport>> submitTask =
@@ -373,11 +371,11 @@
             Map<String, LocalFile> localFiles = Maps.newHashMap();
 
             createLauncherJar(localFiles);
-            createTwillJar(createBundler(classAcceptor), localFiles);
+            createTwillJar(createBundler(classAcceptor), yarnAppClient, localFiles);
             createApplicationJar(createApplicationJarBundler(classAcceptor), localFiles);
             createResourcesJar(createBundler(classAcceptor), localFiles);
 
-            Path runtimeConfigDir = Files.createTempDirectory(localStagingDir.toPath(),
+            Path runtimeConfigDir = Files.createTempDirectory(getLocalStagingDir().toPath(),
                                                               Constants.Files.RUNTIME_CONFIG_JAR);
             try {
               saveSpecification(twillSpec, runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC));
@@ -401,12 +399,11 @@
             //     org.apache.twill.internal.appmaster.ApplicationMasterMain
             //     false
 
-            int reservedMemoryMB = yarnConfig.getInt(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB,
-                                                     Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB);
-            int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(),
-                                                      reservedMemoryMB,
-                                                      minHeapRatio);
-            return launcher.prepareLaunch(ImmutableMap.<String, String>of(), localFiles.values(), credentials)
+            int reservedMemoryMB = config.getInt(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB,
+                                                 Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB);
+            int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(), reservedMemoryMB, getMinHeapRatio());
+            return launcher.prepareLaunch(ImmutableMap.<String, String>of(), localFiles.values(),
+                                          createSubmissionCredentials())
               .addCommand(
                 "$JAVA_HOME/bin/java",
                 "-Djava.io.tmpdir=tmp",
@@ -431,6 +428,29 @@
     }
   }
 
+  /**
+   * Returns the minimum heap ratio based on the configuration.
+   */
+  private double getMinHeapRatio() {
+    // doing this way to support hadoop-2.0 profile
+    String minHeapRatioStr = config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO);
+    return (minHeapRatioStr == null) ? Configs.Defaults.HEAP_RESERVED_MIN_RATIO : Double.parseDouble(minHeapRatioStr);
+  }
+
+  /**
+   * Returns the reserved memory size in MB based on the configuration.
+   */
+  private int getReservedMemory() {
+    return config.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB, Configs.Defaults.JAVA_RESERVED_MEMORY_MB);
+  }
+
+  /**
+   * Returns the local staging directory based on the configuration.
+   */
+  private File getLocalStagingDir() {
+    return new File(config.get(Configs.Keys.LOCAL_STAGING_DIRECTORY, Configs.Defaults.LOCAL_STAGING_DIRECTORY));
+  }
+
   private void setEnv(String runnableName, Map<String, String> env, boolean overwrite) {
     Map<String, String> environment = environments.get(runnableName);
     if (environment == null) {
@@ -455,21 +475,40 @@
     this.logLevels.put(runnableName, newLevels);
   }
 
+  /**
+   * Creates an {@link Credentials} by copying the {@link Credentials} of the current user.
+   */
   private Credentials createCredentials() {
     Credentials credentials = new Credentials();
 
     try {
       credentials.addAll(UserGroupInformation.getCurrentUser().getCredentials());
+    } catch (IOException e) {
+      LOG.warn("Failed to get current user UGI. Current user credentials not added.", e);
+    }
+    return credentials;
+  }
 
-      List<Token<?>> tokens = YarnUtils.addDelegationTokens(yarnConfig, appLocation.getLocationFactory(), credentials);
+  /**
+   * Creates a {@link Credentials} for the application submission.
+   */
+  private Credentials createSubmissionCredentials() {
+    Credentials credentials = new Credentials();
+    try {
+      // Acquires delegation token for the location
+      List<Token<?>> tokens = YarnUtils.addDelegationTokens(config, appLocation.getLocationFactory(), credentials);
       if (LOG.isDebugEnabled()) {
         for (Token<?> token : tokens) {
           LOG.debug("Delegation token acquired for {}, {}", appLocation, token);
         }
       }
     } catch (IOException e) {
-      LOG.warn("Failed to check for secure login type. Not gathering any delegation token.", e);
+      LOG.warn("Failed to acquire delegation token for location {}", appLocation);
     }
+
+    // Copy the user provided credentials.
+    // It will override the location delegation tokens acquired above if user supplies it.
+    credentials.addAll(this.credentials);
     return credentials;
   }
 
@@ -481,7 +520,9 @@
     return new DefaultLocalFile(name, location.toURI(), location.lastModified(), location.length(), archive, null);
   }
 
-  private void createTwillJar(final ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
+  private void createTwillJar(final ApplicationBundler bundler,
+                              final YarnAppClient yarnAppClient,
+                              Map<String, LocalFile> localFiles) throws IOException {
     LOG.debug("Create and copy {}", Constants.Files.TWILL_JAR);
     Location location = locationCache.get(Constants.Files.TWILL_JAR, new LocationCache.Loader() {
       @Override
@@ -633,8 +674,8 @@
       TwillRuntimeSpecificationAdapter.create().toJson(
         new TwillRuntimeSpecification(newTwillSpec, appLocation.getLocationFactory().getHomeLocation().getName(),
                                       appLocation.toURI(), zkConnectString, runId, twillSpec.getName(),
-                                      reservedMemory, yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
-                                      logLevels, maxRetries, minHeapRatio), writer);
+                                      getReservedMemory(), config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
+                                      logLevels, maxRetries, getMinHeapRatio()), writer);
     }
     LOG.debug("Done {}", targetFile);
   }
@@ -787,7 +828,7 @@
   }
 
   private ApplicationBundler createBundler(ClassAcceptor classAcceptor) {
-    return new ApplicationBundler(classAcceptor).setTempDir(localStagingDir);
+    return new ApplicationBundler(classAcceptor).setTempDir(getLocalStagingDir());
   }
 
   /**
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 405eb24..d8e48de 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -126,7 +126,6 @@
   };
 
   private final YarnConfiguration yarnConfig;
-  private final YarnAppClient yarnAppClient;
   private final ZKClientService zkClientService;
   private final LocationFactory locationFactory;
   private final Table<String, RunId, YarnTwillController> controllers;
@@ -162,7 +161,6 @@
    */
   public YarnTwillRunnerService(YarnConfiguration config, String zkConnect, LocationFactory locationFactory) {
     this.yarnConfig = config;
-    this.yarnAppClient = new VersionDetectYarnAppClientFactory().create(config);
     this.locationFactory = locationFactory;
     this.zkClientService = getZKClientService(zkConnect);
     this.controllers = HashBasedTable.create();
@@ -288,8 +286,9 @@
       locationCache = new NoCachingLocationCache(appLocation);
     }
 
-    return new YarnTwillPreparer(yarnConfig, twillSpec, runId, yarnAppClient,
-                                 zkClientService.getConnectString(), appLocation, twillClassPaths, jvmOptions,
+    Configuration config = new Configuration(yarnConfig);
+    return new YarnTwillPreparer(config, twillSpec, runId, zkClientService.getConnectString(),
+                                 appLocation, twillClassPaths, jvmOptions,
                                  locationCache, new YarnTwillControllerFactory() {
       @Override
       public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
@@ -596,6 +595,8 @@
         synchronized (YarnTwillRunnerService.this) {
           if (!controllers.contains(appName, runId)) {
             ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName);
+            YarnAppClient yarnAppClient = new VersionDetectYarnAppClientFactory().create(new Configuration(yarnConfig));
+
             YarnTwillController controller = listenController(
               new YarnTwillController(appName, runId, zkClient, amLiveNodeData, yarnAppClient));
             controllers.put(appName, runId, controller);
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
index c6f7b9a..f5143ce 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
@@ -20,6 +20,8 @@
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.Configs;
+import org.apache.twill.api.ResourceReport;
 import org.apache.twill.api.ResourceSpecification;
 import org.apache.twill.api.TwillApplication;
 import org.apache.twill.api.TwillController;
@@ -33,6 +35,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.PrintWriter;
+import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -65,12 +68,19 @@
   public void testMaxHeapSize() throws InterruptedException, TimeoutException, ExecutionException {
     TwillRunner runner = getTwillRunner();
     TwillController controller = runner.prepare(new MaxHeapApp())
+      // Alter the AM container size
+      .withConfiguration(Collections.singletonMap(Configs.Keys.YARN_AM_MEMORY_MB, "256"))
       .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
       .start();
 
     try {
       ServiceDiscovered discovered = controller.discoverService("sleep");
       Assert.assertTrue(waitForSize(discovered, 1, 120));
+
+      // Verify the AM container size
+      ResourceReport resourceReport = controller.getResourceReport();
+      Assert.assertNotNull(resourceReport);
+      Assert.assertEquals(256, resourceReport.getAppMasterResources().getMemoryMB());
     } finally {
       controller.terminate().get(120, TimeUnit.SECONDS);
     }
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
index 9daf06c..a141176 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
@@ -138,7 +138,7 @@
                "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
       conf.setBoolean("yarn.scheduler.include-port-in-node-name", true);
     }
-    conf.set("yarn.nodemanager.vmem-pmem-ratio", "20.1");
+    conf.set("yarn.nodemanager.vmem-pmem-ratio", "100.1");
     conf.set("yarn.nodemanager.vmem-check-enabled", "false");
     conf.set("yarn.scheduler.minimum-allocation-mb", "128");
     conf.set("yarn.nodemanager.delete.debug-delay-sec", "3600");