/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tez.client;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.common.TezClassLoader;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.junit.Assert;
import org.junit.Test;
/**
 * 
 */
public class TestTezClientUtils {
  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
      + TestTezClientUtils.class.getName() + "-tmpDir";
  /**
   * 
   */
  @Test (timeout=5000)
  public void validateSetTezJarLocalResourcesNotDefined() throws Exception {

    TezConfiguration conf = new TezConfiguration(false);
    Credentials credentials = new Credentials();
    try {
      Map<String,LocalResource> resources = new HashMap<String, LocalResource>();
      TezClientUtils.setupTezJarsLocalResources(conf, credentials, resources);
      Assert.fail("Expected TezUncheckedException");
    } catch (TezUncheckedException e) {
      Assert.assertTrue(e.getMessage().contains("Invalid configuration of tez jars"));
    }
  }

  @Test (timeout=5000)
  public void validateSetTezJarLocalResourcesDefinedButEmpty() throws Exception {
    File emptyDir = new File(TEST_ROOT_DIR, "emptyDir");
    emptyDir.deleteOnExit();
    Assert.assertTrue(emptyDir.mkdirs());
    TezConfiguration conf = new TezConfiguration();
    conf.set(TezConfiguration.TEZ_LIB_URIS, emptyDir.toURI().toURL().toString());
    Credentials credentials = new Credentials();
    try {
      Map<String,LocalResource> resources = new HashMap<String, LocalResource>();
      TezClientUtils.setupTezJarsLocalResources(conf, credentials, resources);
      Assert.fail("Expected TezUncheckedException");
    } catch (TezUncheckedException e) {
      Assert.assertTrue(e.getMessage().contains("No files found in locations"));
    }
  }

  /**
   * 
   */
  @Test(expected=FileNotFoundException.class, timeout=5000)
  public void validateSetTezJarLocalResourcesDefinedNonExistingDirectory() throws Exception {

    TezConfiguration conf = new TezConfiguration();
    conf.set(TezConfiguration.TEZ_LIB_URIS, "file:///foo");
    Credentials credentials = new Credentials();
    Map<String,LocalResource> resources = new HashMap<String, LocalResource>();
    TezClientUtils.setupTezJarsLocalResources(conf, credentials, resources);
  }

  /**
   *
   */
  @Test (timeout=20000)
  public void validateSetTezJarLocalResourcesDefinedExistingDirectory() throws Exception {
    URL[] cp = TezClassLoader.getInstance().getURLs();
    StringBuffer buffer = new StringBuffer();
    for (URL url : cp) {
      buffer.append(url.toExternalForm());
      buffer.append(",");
    }
    TezConfiguration conf = new TezConfiguration();
    conf.set(TezConfiguration.TEZ_LIB_URIS, buffer.toString());
    Credentials credentials = new Credentials();
    Map<String, LocalResource> localizedMap = new HashMap<String, LocalResource>();
    boolean usingArchive = TezClientUtils.setupTezJarsLocalResources(conf, credentials,
        localizedMap);
    Assert.assertFalse(usingArchive);
    Set<String> resourceNames = localizedMap.keySet();
    for (URL url : cp) {
      File file = FileUtils.toFile(url);
      if (file.isDirectory()){
        String[] firList = file.list();
        for (String fileNme : firList) {
          File innerFile = new File(file, fileNme);
          if (!innerFile.isDirectory()){
            assertTrue(resourceNames.contains(innerFile.getName()));
          }
          // not supporting deep hierarchies 
        }
      }
      else {
        assertTrue(resourceNames.contains(file.getName()));
      }
    }
  }

  /**
   * 
   * @throws Exception
   */
  @Test (timeout=5000)
  public void validateSetTezJarLocalResourcesDefinedExistingDirectoryIgnored() throws Exception {
    URL[] cp = TezClassLoader.getInstance().getURLs();
    StringBuffer buffer = new StringBuffer();
    for (URL url : cp) {
      buffer.append(url.toExternalForm());
      buffer.append(",");
    }
    TezConfiguration conf = new TezConfiguration();
    conf.set(TezConfiguration.TEZ_LIB_URIS, buffer.toString());
    conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
    Credentials credentials = new Credentials();
    Map<String, LocalResource> localizedMap = new HashMap<String, LocalResource>();
    Assert.assertFalse(TezClientUtils.setupTezJarsLocalResources(conf, credentials, localizedMap));
    assertTrue(localizedMap.isEmpty());
  }

  /**
   * 
   * @throws Exception
   */
  @Test (timeout=20000)
  public void validateSetTezJarLocalResourcesDefinedExistingDirectoryIgnoredSetToFalse() throws Exception {
    URL[] cp = TezClassLoader.getInstance().getURLs();
    StringBuffer buffer = new StringBuffer();
    for (URL url : cp) {
      buffer.append(url.toExternalForm());
      buffer.append(",");
    }
    TezConfiguration conf = new TezConfiguration();
    conf.set(TezConfiguration.TEZ_LIB_URIS, buffer.toString());
    conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, false);
    Credentials credentials = new Credentials();
    Map<String, LocalResource> localizedMap = new HashMap<String, LocalResource>();
    Assert.assertFalse(TezClientUtils.setupTezJarsLocalResources(conf, credentials, localizedMap));
    assertFalse(localizedMap.isEmpty());
  }


    /**
   *
   */
  @Test (timeout=5000)
  public void testTezDefaultFS() throws Exception {
    FileSystem localFs = FileSystem.getLocal(new Configuration());
    StringBuilder tezLibUris = new StringBuilder();

    Path topDir = new Path(TEST_ROOT_DIR, "testTezDefaultFS");
    if (localFs.exists(topDir)) {
      localFs.delete(topDir, true);
    }
    localFs.mkdirs(topDir);

    Path file = new Path(topDir, "file.jar");

    Assert.assertTrue(localFs.createNewFile(file));
    tezLibUris.append(localFs.makeQualified(file).toString());

    TezConfiguration conf = new TezConfiguration();
    conf.set(TezConfiguration.TEZ_LIB_URIS, tezLibUris.toString());
    conf.set("fs.defaultFS", "hdfs:///localhost:1234");
    Credentials credentials = new Credentials();
    Map<String, LocalResource> localizedMap = new HashMap<String, LocalResource>();
    TezClientUtils.setupTezJarsLocalResources(conf, credentials, localizedMap);

    Assert.assertTrue(localFs.delete(file, true));
    Assert.assertTrue(localFs.delete(topDir, true));
  }

    /**
   *
   */
  @Test (timeout=5000)
  public void validateSetTezJarLocalResourcesMultipleTarballs() throws Exception {
    FileSystem localFs = FileSystem.getLocal(new Configuration());
    StringBuilder tezLibUris = new StringBuilder();

    // Create 2 files
    Path topDir = new Path(TEST_ROOT_DIR, "validatemultipletarballs");
    if (localFs.exists(topDir)) {
      localFs.delete(topDir, true);
    }
    localFs.mkdirs(topDir);

    Path tarFile1 = new Path(topDir, "f1.tar.gz");
    Path tarFile2 = new Path(topDir, "f2.tar.gz");

    Assert.assertTrue(localFs.createNewFile(tarFile1));
    Assert.assertTrue(localFs.createNewFile(tarFile2));
    tezLibUris.append(localFs.makeQualified(tarFile1).toString()).append("#tar1").append(",");
    tezLibUris.append(localFs.makeQualified(tarFile2).toString()).append("#tar2").append(",");

    TezConfiguration conf = new TezConfiguration();
    conf.set(TezConfiguration.TEZ_LIB_URIS, tezLibUris.toString());
    Credentials credentials = new Credentials();
    Map<String, LocalResource> localizedMap = new HashMap<String, LocalResource>();
    TezClientUtils.setupTezJarsLocalResources(conf, credentials, localizedMap);
    Set<String> resourceNames = localizedMap.keySet();
    Assert.assertEquals(2, resourceNames.size());
    Assert.assertTrue(resourceNames.contains("tar1"));
    Assert.assertTrue(resourceNames.contains("tar2"));
    Assert.assertFalse(resourceNames.contains("f1.tar.gz"));
    Assert.assertFalse(resourceNames.contains("f2.tar.gz"));


    Assert.assertTrue(localFs.delete(tarFile1, true));
    Assert.assertTrue(localFs.delete(tarFile2, true));
    Assert.assertTrue(localFs.delete(topDir, true));
  }

    /**
   *
   */
  @Test (timeout=5000)
  public void validateSetTezJarLocalResourcesMixTarballAndJar() throws Exception {
    FileSystem localFs = FileSystem.getLocal(new Configuration());
    StringBuilder tezLibUris = new StringBuilder();

    // Create 2 jars and 1 archive
    Path topDir = new Path(TEST_ROOT_DIR, "validatetarballandjar");
    if (localFs.exists(topDir)) {
      localFs.delete(topDir, true);
    }
    localFs.mkdirs(topDir);

    Path tarFile1 = new Path(topDir, "f1.tar.gz");
    Path jarFile2 = new Path(topDir, "f2.jar");
    Path jarFile3 = new Path(topDir, "f3.jar");

    Assert.assertTrue(localFs.createNewFile(tarFile1));
    Assert.assertTrue(localFs.createNewFile(jarFile2));
    Assert.assertTrue(localFs.createNewFile(jarFile3));

    tezLibUris.append(localFs.makeQualified(topDir).toString()).append(",");
    tezLibUris.append(localFs.makeQualified(tarFile1).toString()).append("#tar1").append(",");

    TezConfiguration conf = new TezConfiguration();
    conf.set(TezConfiguration.TEZ_LIB_URIS, tezLibUris.toString());
    Credentials credentials = new Credentials();
    Map<String, LocalResource> localizedMap = new HashMap<String, LocalResource>();
    TezClientUtils.setupTezJarsLocalResources(conf, credentials, localizedMap);
    Set<String> resourceNames = localizedMap.keySet();
    Assert.assertEquals(4, resourceNames.size());
    Assert.assertTrue(resourceNames.contains("tar1"));
    Assert.assertTrue(resourceNames.contains("f1.tar.gz"));
    Assert.assertTrue(resourceNames.contains("f2.jar"));
    Assert.assertTrue(resourceNames.contains("f3.jar"));

    Assert.assertTrue(localFs.delete(tarFile1, true));
    Assert.assertTrue(localFs.delete(jarFile2, true));
    Assert.assertTrue(localFs.delete(jarFile3, true));
    Assert.assertTrue(localFs.delete(topDir, true));
  }

  @Test(timeout = 2000)
  // this test checks if the priority field is set properly in the
  // ApplicationSubmissionContext
  public void testAppSubmissionContextForPriority() throws Exception {
    TezConfiguration tezConf = new TezConfiguration();
    int testpriority = 999;
    ApplicationId appId = ApplicationId.newInstance(1000, 1);
    Credentials credentials = new Credentials();
    TezClientUtils.createSessionToken(appId.toString(),
        new JobTokenSecretManager(), credentials);
    tezConf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
    Map<String, LocalResource> m = new HashMap<String, LocalResource>();
    tezConf.setInt(TezConfiguration.TEZ_AM_APPLICATION_PRIORITY, testpriority);
    AMConfiguration amConf =
        new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), credentials);
    ApplicationSubmissionContext appcontext;
    appcontext = TezClientUtils.createApplicationSubmissionContext(
        appId, null, "dagname",
        amConf, m,
        credentials, false,
        new TezApiVersionInfo(), null, null);
    assertEquals(testpriority, appcontext.getPriority().getPriority());
  }

  @Test(timeout=1000)
  // when tez config property for app priority not set,
  // tez should not set app priority and let YARN deal with it.
  public void testSetApplicationPriority() {
    TezConfiguration conf = new TezConfiguration(false);
    AMConfiguration amconfig = new AMConfiguration(conf, null, null);
    ApplicationSubmissionContext appContext = Records
        .newRecord(ApplicationSubmissionContext.class);
    TezClientUtils.setApplicationPriority(appContext, amconfig);
    assertNull(appContext.getPriority());
  }

  @Test(timeout=1000)
  public void testSetApplicationTags() {
    TezConfiguration conf = new TezConfiguration(false);
    conf.set(TezConfiguration.TEZ_APPLICATION_TAGS, "foo,bar");
    AMConfiguration amconfig = new AMConfiguration(conf, null, null);
    ApplicationSubmissionContext appContext = Records
        .newRecord(ApplicationSubmissionContext.class);
    Collection<String> tagsFromConf =
        amconfig.getTezConfiguration().getTrimmedStringCollection(
        TezConfiguration.TEZ_APPLICATION_TAGS);
    appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
    assertTrue(appContext.getApplicationTags().contains("foo"));
    assertTrue(appContext.getApplicationTags().contains("bar"));
  }

  @Test(timeout = 5000)
  public void testSessionTokenInAmClc() throws IOException, YarnException {

    TezConfiguration tezConf = new TezConfiguration();

    ApplicationId appId = ApplicationId.newInstance(1000, 1);
    DAG dag = DAG.create("testdag");
    dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
        .setTaskLaunchCmdOpts("initialLaunchOpts"));

    Credentials credentials = new Credentials();
    JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
    TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
    Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
    assertNotNull(jobToken);

    AMConfiguration amConf =
        new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), credentials);
    ApplicationSubmissionContext appSubmissionContext =
        TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
            new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
            null, null);

    ContainerLaunchContext amClc = appSubmissionContext.getAMContainerSpec();
    Map<String, ByteBuffer> amServiceData = amClc.getServiceData();
    assertNotNull(amServiceData);
    assertEquals(1, amServiceData.size());

    DataInputByteBuffer dibb = new DataInputByteBuffer();
    dibb.reset(amServiceData.values().iterator().next());
    Token<JobTokenIdentifier> jtSent = new Token<JobTokenIdentifier>();
    jtSent.readFields(dibb);

    assertTrue(Arrays.equals(jobToken.getIdentifier(), jtSent.getIdentifier()));
  }

  @Test(timeout = 5000)
  public void testAMLoggingOptsSimple() throws IOException, YarnException {

    TezConfiguration tezConf = new TezConfiguration();
    tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "WARN");

    ApplicationId appId = ApplicationId.newInstance(1000, 1);
    Credentials credentials = new Credentials();
    JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
    TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
    DAG dag = DAG.create("testdag");
    dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
        .setTaskLaunchCmdOpts("initialLaunchOpts"));
    AMConfiguration amConf =
        new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), credentials);
    ApplicationSubmissionContext appSubmissionContext =
        TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
            new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
            null, null);

    List<String> expectedCommands = new LinkedList<String>();
    expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator");
    expectedCommands.add("-Dlog4j.configuration=" + TezConstants.TEZ_CONTAINER_LOG4J_PROPERTIES_FILE);
    expectedCommands.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" +
        ApplicationConstants.LOG_DIR_EXPANSION_VAR);
    expectedCommands.add("-D" + TezConstants.TEZ_ROOT_LOGGER_NAME + "=" + "WARN" + "," +
        TezConstants.TEZ_CONTAINER_LOGGER_NAME);

    List<String> commands = appSubmissionContext.getAMContainerSpec().getCommands();
    assertEquals(1, commands.size());
    for (String expectedCmd : expectedCommands) {
      assertTrue(commands.get(0).contains(expectedCmd));
    }

    Map<String, String> environment = appSubmissionContext.getAMContainerSpec().getEnvironment();
    String logEnv = environment.get(TezConstants.TEZ_CONTAINER_LOG_PARAMS);
    assertNull(logEnv);
  }

  @Test(timeout = 5000)
  public void testAMLoggingOptsPerLogger() throws IOException, YarnException {

    TezConfiguration tezConf = new TezConfiguration();
    tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL,
        "WARN;org.apache.hadoop.ipc=DEBUG;org.apache.hadoop.security=DEBUG");

    ApplicationId appId = ApplicationId.newInstance(1000, 1);
    Credentials credentials = new Credentials();
    JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
    TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
    DAG dag = DAG.create("testdag");
    dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
        .setTaskLaunchCmdOpts("initialLaunchOpts"));
    AMConfiguration amConf =
        new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), credentials);
    ApplicationSubmissionContext appSubmissionContext =
        TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
            new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
            null, null);

    List<String> expectedCommands = new LinkedList<String>();
    expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator");
    expectedCommands.add(
        "-Dlog4j.configuration=" + TezConstants.TEZ_CONTAINER_LOG4J_PROPERTIES_FILE);
    expectedCommands.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" +
        ApplicationConstants.LOG_DIR_EXPANSION_VAR);
    expectedCommands.add("-D" + TezConstants.TEZ_ROOT_LOGGER_NAME + "=" + "WARN" + "," +
        TezConstants.TEZ_CONTAINER_LOGGER_NAME);

    List<String> commands = appSubmissionContext.getAMContainerSpec().getCommands();
    assertEquals(1, commands.size());
    for (String expectedCmd : expectedCommands) {
      assertTrue(commands.get(0).contains(expectedCmd));
    }

    Map<String, String> environment = appSubmissionContext.getAMContainerSpec().getEnvironment();
    String logEnv = environment.get(TezConstants.TEZ_CONTAINER_LOG_PARAMS);
    assertEquals("org.apache.hadoop.ipc=DEBUG;org.apache.hadoop.security=DEBUG", logEnv);
  }

  @Test(timeout = 5000)
  public void testAMCommandOpts() {
    Path tmpDir = new Path(Environment.PWD.$(),
        YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
    String tmpOpts = "-Djava.io.tmpdir=" + tmpDir;
    TezConfiguration tezConf = new TezConfiguration();
    String amCommandOpts = "-Xmx 200m -Dtest.property";
    tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, amCommandOpts);

    // Test1: Rely on defaults for cluster-default opts
    String amOptsConstructed =
        TezClientUtils.constructAMLaunchOpts(tezConf, Resource.newInstance(1024, 1));
    assertEquals(tmpOpts + " "
        + TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT + " "
        + amCommandOpts,
        amOptsConstructed);

    // Test2: Setup cluster-default command opts explicitly
    String clusterDefaultCommandOpts =
        "-server -Djava.net.preferIPv4Stack=true -XX:+PrintGCDetails -verbose:gc ";
    tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS, clusterDefaultCommandOpts);
    amOptsConstructed =
        TezClientUtils.constructAMLaunchOpts(tezConf, Resource.newInstance(1024, 1));
    assertEquals(tmpOpts + " " + clusterDefaultCommandOpts + " " + amCommandOpts, amOptsConstructed);


    // Test3: Don't setup Xmx explicitly
    final double factor = 0.8;
    amCommandOpts = "-Dtest.property";
    tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, amCommandOpts);
    amOptsConstructed =
        TezClientUtils.constructAMLaunchOpts(tezConf, Resource.newInstance(1024, 1));
    // It's OK for the Xmx value to show up before cluster default options, since Xmx will not be replaced if it already exists.
    assertEquals(
        " -Xmx" + ((int) (1024 * factor)) + "m" + " " + tmpOpts + " " + clusterDefaultCommandOpts + " " +
            amCommandOpts,
        amOptsConstructed);

    // Test4: Ensure admin options with Xmx does not cause them to be overridden. This should almost never be done though.
    clusterDefaultCommandOpts =
        "-server -Djava.net.preferIPv4Stack=true -XX:+PrintGCDetails -verbose:gc -Xmx200m";
    tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS, clusterDefaultCommandOpts);
    amOptsConstructed =
        TezClientUtils.constructAMLaunchOpts(tezConf, Resource.newInstance(1024, 1));
    assertEquals(tmpOpts + " " + clusterDefaultCommandOpts + " " + amCommandOpts, amOptsConstructed);
  }

  @Test(timeout = 5000)
  public void testTaskCommandOpts() throws TezException {
    TezConfiguration tezConf = new TezConfiguration();
    String taskCommandOpts = "-Xmx 200m -Dtest.property";
    tezConf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, taskCommandOpts);
    String expected = null;

    // Test1: Rely on defaults for cluster default opts
    String taskOptsConstructed = TezClientUtils.addDefaultsToTaskLaunchCmdOpts("", tezConf);
    expected =
        TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT + " " + taskCommandOpts;
    assertTrue(
        "Did not find Expected prefix: [" + expected + "] in string [" + taskOptsConstructed +
            "]", taskOptsConstructed.startsWith(expected));

    // Test2: Setup cluster-default command opts explicitly
    String taskClusterDefaultCommandOpts =
        "-server -Djava.net.preferIPv4Stack=true -XX:+PrintGCDetails -verbose:gc ";
    tezConf.set(TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS,
        taskClusterDefaultCommandOpts);
    taskOptsConstructed =
        TezClientUtils.addDefaultsToTaskLaunchCmdOpts("", tezConf);
    expected = taskClusterDefaultCommandOpts + " " + taskCommandOpts;
    assertTrue(
        "Did not find Expected prefix: [" + expected + "] in string [" + taskOptsConstructed +
            "]", taskOptsConstructed.startsWith(expected));

    // Test3: Don't setup Xmx explicitly
    taskCommandOpts = "-Dtest.property";
    tezConf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, taskCommandOpts);
    taskOptsConstructed =
        TezClientUtils.addDefaultsToTaskLaunchCmdOpts("", tezConf);
    expected = taskClusterDefaultCommandOpts + " " + taskCommandOpts;
    assertTrue(
        "Did not find Expected prefix: [" + expected + "] in string [" + taskOptsConstructed +
            "]", taskOptsConstructed.startsWith(expected));

    // Test4: Pass in a dag-configured value.
    String programmaticTaskOpts = "-Dset.programatically=true -Djava.net.preferIPv4Stack=false";
    taskOptsConstructed =
        TezClientUtils.addDefaultsToTaskLaunchCmdOpts(programmaticTaskOpts, tezConf);
    // Container logging is always added at the end, if it's required.
    expected = taskClusterDefaultCommandOpts + " " + taskCommandOpts + " " + programmaticTaskOpts;
    assertTrue(
        "Did not find Expected prefix: [" + expected + "] in string [" + taskOptsConstructed +
            "]", taskOptsConstructed.startsWith(expected));
  }


  @Test (timeout=5000)
  public void testDefaultMemoryJavaOpts() {
    final double factor = 0.8;
    String origJavaOpts = "-Xmx";
    String javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
        Resource.newInstance(1000, 1), factor);
    Assert.assertEquals(origJavaOpts, javaOpts);

    origJavaOpts = "";
    javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
        Resource.newInstance(1000, 1), factor);
    Assert.assertTrue(javaOpts.contains("-Xmx800m"));

    origJavaOpts = "";
    javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
        Resource.newInstance(1, 1), factor);
    Assert.assertTrue(javaOpts.contains("-Xmx1m"));

    origJavaOpts = "";
    javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
        Resource.newInstance(-1, 1), factor);
    Assert.assertEquals(origJavaOpts, javaOpts);

    origJavaOpts = "";
    javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
        Resource.newInstance(355, 1), factor);
    Assert.assertTrue(javaOpts.contains("-Xmx284m"));

    origJavaOpts = " -Xms100m ";
    javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
        Resource.newInstance(355, 1), factor);
    Assert.assertFalse(javaOpts.contains("-Xmx284m"));
    Assert.assertTrue(javaOpts.contains("-Xms100m"));

    origJavaOpts = "";
    javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
        Resource.newInstance(355, 1), 0);
    Assert.assertEquals(origJavaOpts, javaOpts);

    origJavaOpts = "";
    javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
        Resource.newInstance(355, 1), 100);
    Assert.assertEquals(origJavaOpts, javaOpts);

    origJavaOpts = "";
    javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
        Resource.newInstance(1000, 1), -1);
    Assert.assertTrue(javaOpts.contains("-Xmx700m"));

    origJavaOpts = "";
    javaOpts = TezClientUtils.maybeAddDefaultMemoryJavaOpts(origJavaOpts,
        Resource.newInstance(5000, 1), -1);
    Assert.assertTrue(javaOpts.contains("-Xmx4000m"));
  }

  @Test (timeout=5000)
  public void testDefaultLoggingJavaOpts() {
    String origJavaOpts = null;
    String javaOpts = TezClientUtils.maybeAddDefaultLoggingJavaOpts("FOOBAR", origJavaOpts);
    Assert.assertNotNull(javaOpts);
    Assert.assertTrue(javaOpts.contains("-D" + TezConstants.TEZ_ROOT_LOGGER_NAME + "=FOOBAR")
        && javaOpts.contains(TezConstants.TEZ_CONTAINER_LOG4J_PROPERTIES_FILE)
        &&
        javaOpts.contains("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator"));
  }

  @Test (timeout = 5000)
  public void testConfSerializationForAm() {
    Configuration conf =new Configuration(false);
    String val1 = "fixedProperty";
    String val2 = "parametrizedProperty/${user.name}";
    String expVal2 = "parametrizedProperty/" + System.getProperty("user.name");
    conf.set("property1", val1);
    conf.set("property2", val2);

    Map<String, String> expected = new HashMap<String, String>();
    expected.put("property1", val1);
    expected.put("property2", expVal2);

    ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null);

    for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) {
      String v = expected.remove(kvPair.getKey());
      assertEquals(v, kvPair.getValue());
    }
    assertTrue(expected.isEmpty());
  }

  // To run this test case see TestTezCommonUtils::testLocalResourceVisibility
  // We do not have much control over the directory structure, cannot mock as the functions are
  // static and do not want to spin up a minitez cluster just for this.
  public static void testLocalResourceVisibility(DistributedFileSystem remoteFs, Configuration conf)
      throws Exception {

    Path topLevelDir = null;
    try {
      FsPermission publicDirPerms = new FsPermission((short) 0755);   // rwxr-xr-x
      FsPermission privateDirPerms = new FsPermission((short) 0754);  // rwxr-xr--
      FsPermission publicFilePerms = new FsPermission((short) 0554);  // r-xr-xr--
      FsPermission privateFilePerms = new FsPermission((short) 0550); // r-xr-x---

      String fsURI = remoteFs.getUri().toString();

      topLevelDir = new Path(fsURI, "/testLRVisibility");
      Assert.assertTrue(remoteFs.mkdirs(topLevelDir, publicDirPerms));

      Path publicSubDir = new Path(topLevelDir, "public_sub_dir");
      Assert.assertTrue(remoteFs.mkdirs(publicSubDir, publicDirPerms));

      Path privateSubDir = new Path(topLevelDir, "private_sub_dir");
      Assert.assertTrue(remoteFs.mkdirs(privateSubDir, privateDirPerms));

      Path publicFile = new Path(publicSubDir, "public_file");
      Assert.assertTrue(remoteFs.createNewFile(publicFile));
      remoteFs.setPermission(publicFile, publicFilePerms);

      Path privateFile = new Path(publicSubDir, "private_file");
      Assert.assertTrue(remoteFs.createNewFile(privateFile));
      remoteFs.setPermission(privateFile, privateFilePerms);

      Path publicFileInPrivateSubdir = new Path(privateSubDir, "public_file_in_private_subdir");
      Assert.assertTrue(remoteFs.createNewFile(publicFileInPrivateSubdir));
      remoteFs.setPermission(publicFileInPrivateSubdir, publicFilePerms);

      TezConfiguration tezConf = new TezConfiguration(conf);
      String tmpTezLibUris = String.format("%s,%s,%s,%s", topLevelDir, publicSubDir, privateSubDir,
          conf.get(TezConfiguration.TEZ_LIB_URIS, ""));
      tezConf.set(TezConfiguration.TEZ_LIB_URIS, tmpTezLibUris);

      Map<String, LocalResource> lrMap = new HashMap<String, LocalResource>();
      TezClientUtils.setupTezJarsLocalResources(tezConf, new Credentials(), lrMap);

      Assert.assertEquals(publicFile.getName(), LocalResourceVisibility.PUBLIC,
          lrMap.get(publicFile.getName()).getVisibility());

      Assert.assertEquals(privateFile.getName(), LocalResourceVisibility.PRIVATE,
          lrMap.get(privateFile.getName()).getVisibility());

      Assert.assertEquals(publicFileInPrivateSubdir.getName(), LocalResourceVisibility.PRIVATE,
          lrMap.get(publicFileInPrivateSubdir.getName()).getVisibility());

      // test tar.gz
      tezConf = new TezConfiguration(conf);
      Path tarFile = new Path(topLevelDir, "foo.tar.gz");
      Assert.assertTrue(remoteFs.createNewFile(tarFile));

      //public
      remoteFs.setPermission(tarFile, publicFilePerms);
      tezConf.set(TezConfiguration.TEZ_LIB_URIS, tarFile.toString());
      lrMap.clear();
      Assert.assertTrue(
          TezClientUtils.setupTezJarsLocalResources(tezConf, new Credentials(), lrMap));

      Assert.assertEquals(LocalResourceVisibility.PUBLIC,
          lrMap.get(TezConstants.TEZ_TAR_LR_NAME).getVisibility());

      //private
      remoteFs.setPermission(tarFile, privateFilePerms);
      lrMap.clear();
      TezClientUtils.setupTezJarsLocalResources(tezConf, new Credentials(), lrMap);
      Assert.assertEquals(LocalResourceVisibility.PRIVATE,
          lrMap.get(TezConstants.TEZ_TAR_LR_NAME).getVisibility());

    } finally {
      if (topLevelDir != null) {
        remoteFs.delete(topLevelDir, true);
      }
    }
  }

  @Test(timeout = 5000)
  public void testConfigurationAllowAll() {
    Configuration srcConf = new Configuration(false);

    Map<String, String> confMap = new HashMap<String, String>();
    confMap.put("foo.property", "2000");
    confMap.put("tez.property", "tezProperty");
    confMap.put("yarn.property", "yarnProperty");

    for (Map.Entry<String, String> entry : confMap.entrySet()) {
      srcConf.set(entry.getKey(), entry.getValue());
    }

    ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(srcConf, null);

    for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) {
      String val = confMap.remove(kvPair.getKey());
      assertNotNull(val);
      assertEquals(val, kvPair.getValue());
    }
    assertTrue(confMap.isEmpty());
  }

  private Path createFile(FileSystem fs, Path dir, String fileName) throws IOException {
    Path f1 = new Path(dir, fileName);
    FSDataOutputStream outputStream = fs.create(f1, true);
    outputStream.write(1);
    outputStream.close();
    return fs.makeQualified(f1);
  }

  @Test (timeout=5000)
  public void validateSetTezAuxLocalResourcesWithFilesAndFolders() throws Exception {
    FileSystem localFs = FileSystem.getLocal(new Configuration());
    List<String> resources = new ArrayList<String>();
    StringBuilder auxUriStr = new StringBuilder();

    // Create 2 files
    Path topDir = new Path(TEST_ROOT_DIR, "validateauxwithfiles");
    if (localFs.exists(topDir)) {
      localFs.delete(topDir, true);
    }
    localFs.mkdirs(topDir);
    resources.add(createFile(localFs, topDir, "f1.txt").toString());
    auxUriStr.append(localFs.makeQualified(topDir).toString()).append(",");

    Path dir2 = new Path(topDir, "dir2");
    localFs.mkdirs(dir2);
    Path nestedDir = new Path(dir2, "nestedDir");
    localFs.mkdirs(nestedDir);
    createFile(localFs, nestedDir, "nested-f.txt");
    resources.add(createFile(localFs, dir2, "dir2-f.txt").toString());
    auxUriStr.append(localFs.makeQualified(dir2).toString()).append(",");

    Path dir3 = new Path(topDir, "dir3");
    localFs.mkdirs(dir3);
    auxUriStr.append(localFs.makeQualified(dir3).toString()).append(",");

    TezConfiguration conf = new TezConfiguration();
    conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
    conf.set(TezConfiguration.TEZ_AUX_URIS, auxUriStr.toString());
    Credentials credentials = new Credentials();
    Map<String, LocalResource> localizedMap = new HashMap<String, LocalResource>();
    TezClientUtils.setupTezJarsLocalResources(conf, credentials, localizedMap);
    Set<String> resourceNames = localizedMap.keySet();
    Assert.assertEquals(2, resourceNames.size());
    Assert.assertTrue(resourceNames.contains("f1.txt"));
    Assert.assertTrue(resourceNames.contains("dir2-f.txt"));
  }

  @Test(timeout = 5000)
  public void testServiceDescriptorSerializationForAM() {
    Configuration conf = new Configuration(false);
    ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true);

    ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf,
        servicePluginsDescriptor);

    assertTrue(confProto.hasAmPluginDescriptor());
    assertTrue(confProto.getAmPluginDescriptor().getUberEnabled());
  }

  @Test(timeout = 5000)
  public void testTaskLaunchCmdOptsSetup() throws TezException {
    Configuration conf = new Configuration(false);
    String vOpts = "";
    String opts = TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vOpts, conf);

    Assert.assertEquals(opts,
        TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT + " "
            + TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS_DEFAULT + " " + vOpts);

    vOpts = "foo";
    opts = TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vOpts, conf);

    Assert.assertEquals(opts,
        TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT + "  " + vOpts);

    String taskOpts = "taskOpts";
    conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, taskOpts);
    opts = TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vOpts, conf);

    Assert.assertEquals(opts,
        TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT
            + " " + taskOpts + " " + vOpts);

  }

  @Test(timeout = 5000)
  public void testClusterTaskLaunchCmdOptsSetup() throws TezException {
    Configuration conf = new Configuration(false);
    String adminOpts = "adminOpts";
    conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS, adminOpts);

    String vOpts = "";
    String opts = TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vOpts, conf);

    Assert.assertEquals(opts,
        adminOpts + " "
            + TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS_DEFAULT + " " + vOpts);

    vOpts = "foo";
    opts = TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vOpts, conf);

    Assert.assertEquals(opts, adminOpts + "  " + vOpts);

    String taskOpts = "taskOpts";
    conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, taskOpts);
    opts = TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vOpts, conf);

    Assert.assertEquals(opts, adminOpts + " " + taskOpts + " " + vOpts);

  }


}
