MAPREDUCE-2407. Make GridMix emulate usage of distributed cache files in simulated jobs.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1126499 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
new file mode 100644
index 0000000..0cd41ca
--- /dev/null
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
@@ -0,0 +1,548 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Emulation of Distributed Cache Usage in gridmix.
+ * <br> Emulation of Distributed Cache Load in gridmix will put load on
+ * TaskTrackers and affects execution time of tasks because of localization of
+ * distributed cache files by TaskTrackers.
+ * <br> Gridmix creates distributed cache files for simulated jobs by launching
+ * a MapReduce job {@link GenerateDistCacheData} in advance i.e. before
+ * launching simulated jobs.
+ * <br> The distributed cache file paths used in the original cluster are mapped
+ * to unique file names in the simulated cluster.
+ * <br> All HDFS-based distributed cache files generated by gridmix are
+ * public distributed cache files. But Gridmix makes sure that load incurred due
+ * to localization of private distributed cache files on the original cluster
+ * is also faithfully simulated. Gridmix emulates the load due to private
+ * distributed cache files by mapping private distributed cache files of
+ * different users in the original cluster to different public distributed cache
+ * files in the simulated cluster.
+ *
+ * <br> The configuration properties like
+ * {@link MRJobConfig#CACHE_FILES}, {@link MRJobConfig#CACHE_FILE_VISIBILITIES},
+ * {@link MRJobConfig#CACHE_FILES_SIZES} and
+ * {@link MRJobConfig#CACHE_FILE_TIMESTAMPS} obtained from trace are used to
+ *  decide
+ * <li> file size of each distributed cache file to be generated
+ * <li> whether a distributed cache file is already seen in this trace file
+ * <li> whether a distributed cache file was considered public or private.
+ * <br>
+ * <br> Gridmix configures these generated files as distributed cache files for
+ * the simulated jobs.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class DistributedCacheEmulator {
+  private static final Log LOG =
+      LogFactory.getLog(DistributedCacheEmulator.class);
+
+  static final long AVG_BYTES_PER_MAP = 128 * 1024 * 1024L;// 128MB
+
+  // If at least 1 distributed cache file is missing in the expected
+  // distributed cache dir, Gridmix cannot proceed with emulation of
+  // distributed cache load.
+  int MISSING_DIST_CACHE_FILES_ERROR = 1;
+
+  private Path distCachePath;
+
+  /**
+   * Map between simulated cluster's distributed cache file paths and their
+   * file sizes. Unique distributed cache files are entered into this map.
+   * 2 distributed cache files are considered same if and only if their
+   * file paths, visibilities and timestamps are same.
+   */
+  private Map<String, Long> distCacheFiles = new HashMap<String, Long>();
+
+  /**
+   * Configuration property for whether gridmix should emulate
+   * distributed cache usage or not. Default value is true.
+   */
+  static final String GRIDMIX_EMULATE_DISTRIBUTEDCACHE =
+      "gridmix.distributed-cache-emulation.enable";
+
+  // Whether to emulate distributed cache usage or not
+  boolean emulateDistributedCache = true;
+
+  // Whether to generate distributed cache data or not
+  boolean generateDistCacheData = false;
+
+  Configuration conf; // gridmix configuration
+
+  // Pseudo local file system where local FS based distributed cache files are
+  // created by gridmix.
+  FileSystem pseudoLocalFs = null;
+
+  {
+    // Need to handle deprecation of these MapReduce-internal configuration
+    // properties as MapReduce doesn't handle their deprecation.
+    Configuration.addDeprecation("mapred.cache.files.filesizes",
+        new String[] {MRJobConfig.CACHE_FILES_SIZES});
+    Configuration.addDeprecation("mapred.cache.files.visibilities",
+        new String[] {MRJobConfig.CACHE_FILE_VISIBILITIES});
+  }
+
+  /**
+   * @param conf gridmix configuration
+   * @param ioPath &lt;ioPath&gt;/distributedCache/ is the gridmix Distributed
+   *               Cache directory
+   */
+  public DistributedCacheEmulator(Configuration conf, Path ioPath) {
+    this.conf = conf;
+    distCachePath = new Path(ioPath, "distributedCache");
+    this.conf.setClass("fs.pseudo.impl", PseudoLocalFs.class, FileSystem.class);
+  }
+
+  /**
+   * This is to be called before any other method of DistributedCacheEmulator.
+   * <br> Checks if emulation of distributed cache load is needed and is feasible.
+   *  Sets the flags generateDistCacheData and emulateDistributedCache to the
+   *  appropriate values.
+   * <br> Gridmix does not emulate distributed cache load if
+   * <ol><li> the specific gridmix job type doesn't need emulation of
+   * distributed cache load OR
+   * <li> the trace is coming from a stream instead of file OR
+   * <li> the distributed cache dir where distributed cache data is to be
+   * generated by gridmix is on local file system OR
+   * <li> execute permission is not there for any of the ascendant directories
+   * of &lt;ioPath&gt; till root. This is because for emulation of distributed
+   * cache load, distributed cache files created under
+   * &lt;ioPath/distributedCache/public/&gt; should be considered by hadoop
+   * as public distributed cache files.
+   * <li> creation of pseudo local file system fails.</ol>
+   * <br> For (2), (3), (4) and (5), generation of distributed cache data
+   * is also disabled.
+   * 
+   * @param traceIn trace file path. If this is '-', then trace comes from the
+   *                stream stdin.
+   * @param jobCreator job creator of gridmix jobs of a specific type
+   * @param generate  true if -generate option was specified
+   * @throws IOException
+   */
+  void init(String traceIn, JobCreator jobCreator, boolean generate)
+      throws IOException {
+    emulateDistributedCache = jobCreator.canEmulateDistCacheLoad()
+        && conf.getBoolean(GRIDMIX_EMULATE_DISTRIBUTEDCACHE, true);
+    generateDistCacheData = generate;
+
+    if (generateDistCacheData || emulateDistributedCache) {
+      if ("-".equals(traceIn)) {// trace is from stdin
+        LOG.warn("Gridmix will not emulate Distributed Cache load because "
+            + "the input trace source is a stream instead of file.");
+        emulateDistributedCache = generateDistCacheData = false;
+      } else if (FileSystem.getLocal(conf).getUri().getScheme().equals(
+          distCachePath.toUri().getScheme())) {// local FS
+        LOG.warn("Gridmix will not emulate Distributed Cache load because "
+            + "<iopath> provided is on local file system.");
+        emulateDistributedCache = generateDistCacheData = false;
+      } else {
+        // Check if execute permission is there for all the ascendant
+        // directories of distCachePath till root.
+        FileSystem fs = FileSystem.get(conf);
+        Path cur = distCachePath.getParent();
+        while (cur != null) {
+          if (cur.toString().length() > 0) {
+            FsPermission perm = fs.getFileStatus(cur).getPermission();
+            if (!perm.getOtherAction().and(FsAction.EXECUTE).equals(
+                FsAction.EXECUTE)) {
+              LOG.warn("Gridmix will not emulate Distributed Cache load "
+                  + "because the ascendant directory (of distributed cache "
+                  + "directory) " + cur + " doesn't have execute permission "
+                  + "for others.");
+              emulateDistributedCache = generateDistCacheData = false;
+              break;
+            }
+          }
+          cur = cur.getParent();
+        }
+      }
+    }
+
+    // Check if pseudo local file system can be created
+    try {
+      pseudoLocalFs = FileSystem.get(new URI("pseudo:///"), conf);
+    } catch (URISyntaxException e) {
+      LOG.warn("Gridmix will not emulate Distributed Cache load because "
+          + "creation of pseudo local file system failed.");
+      e.printStackTrace();
+      emulateDistributedCache = generateDistCacheData = false;
+      return;
+    }
+  }
+
+  /**
+   * @return true if gridmix should emulate distributed cache load
+   */
+  boolean shouldEmulateDistCacheLoad() {
+    return emulateDistributedCache;
+  }
+
+  /**
+   * @return true if gridmix should generate distributed cache data
+   */
+  boolean shouldGenerateDistCacheData() {
+    return generateDistCacheData;
+  }
+
+  /**
+   * @return the distributed cache directory path
+   */
+  Path getDistributedCacheDir() {
+    return distCachePath;
+  }
+
+  /**
+   * Create distributed cache directories.
+   * Also create a file that contains the list of distributed cache files
+   * that will be used as distributed cache files for all the simulated jobs.
+   * @param jsp job story producer for the trace
+   * @return exit code
+   * @throws IOException
+   */
+  int setupGenerateDistCacheData(JobStoryProducer jsp)
+      throws IOException {
+
+    createDistCacheDirectory();
+    return buildDistCacheFilesList(jsp);
+  }
+
+  /**
+   * Create distributed cache directory where distributed cache files will be
+   * created by the MapReduce job {@link GenerateDistCacheData#JOB_NAME}.
+   * @throws IOException
+   */
+  private void createDistCacheDirectory() throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    FileSystem.mkdirs(fs, distCachePath, new FsPermission((short) 0777));
+  }
+
+  /**
+   * Create the list of unique distributed cache files needed for all the
+   * simulated jobs and write the list to a special file.
+   * @param jsp job story producer for the trace
+   * @return exit code
+   * @throws IOException
+   */
+  private int buildDistCacheFilesList(JobStoryProducer jsp) throws IOException {
+    // Read all the jobs from the trace file and build the list of unique
+    // distributed cache files.
+    JobStory jobStory;
+    while ((jobStory = jsp.getNextJob()) != null) {
+      if (jobStory.getOutcome() == Pre21JobHistoryConstants.Values.SUCCESS && 
+         jobStory.getSubmissionTime() >= 0) {
+        updateHDFSDistCacheFilesList(jobStory);
+      }
+    }
+    jsp.close();
+
+    return writeDistCacheFilesList();
+  }
+
+  /**
+   * For the job to be simulated, identify the needed distributed cache files by
+   * mapping original cluster's distributed cache file paths to the simulated cluster's
+   * paths and add these paths in the map {@code distCacheFiles}.
+   *<br>
+   * JobStory should contain distributed cache related properties like
+   * <li> {@link MRJobConfig#CACHE_FILES}
+   * <li> {@link MRJobConfig#CACHE_FILE_VISIBILITIES}
+   * <li> {@link MRJobConfig#CACHE_FILES_SIZES}
+   * <li> {@link MRJobConfig#CACHE_FILE_TIMESTAMPS}
+   * <li> {@link MRJobConfig#CLASSPATH_FILES}
+   *
+   * <li> {@link MRJobConfig#CACHE_ARCHIVES}
+   * <li> {@link MRJobConfig#CACHE_ARCHIVES_VISIBILITIES}
+   * <li> {@link MRJobConfig#CACHE_ARCHIVES_SIZES}
+   * <li> {@link MRJobConfig#CACHE_ARCHIVES_TIMESTAMPS}
+   * <li> {@link MRJobConfig#CLASSPATH_ARCHIVES}
+   *
+   * <li> {@link MRJobConfig#CACHE_SYMLINK}
+   *
+   * @param jobdesc JobStory of original job obtained from trace
+   * @throws IOException
+   */
+  void updateHDFSDistCacheFilesList(JobStory jobdesc) throws IOException {
+
+    // Map original job's distributed cache file paths to simulated cluster's
+    // paths, to be used by this simulated job.
+    JobConf jobConf = jobdesc.getJobConf();
+
+    String[] files = jobConf.getStrings(MRJobConfig.CACHE_FILES);
+    if (files != null) {
+
+      String[] fileSizes = jobConf.getStrings(MRJobConfig.CACHE_FILES_SIZES);
+      String[] visibilities =
+        jobConf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES);
+      String[] timeStamps =
+        jobConf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+
+      FileSystem fs = FileSystem.get(conf);
+      String user = jobConf.getUser();
+      for (int i = 0; i < files.length; i++) {
+        // Check if visibilities are available because older hadoop versions
+        // didn't have public, private Distributed Caches separately.
+        boolean visibility =
+            (visibilities == null) ? true : Boolean.valueOf(visibilities[i]);
+        if (isLocalDistCacheFile(files[i], user, visibility)) {
+          // local FS based distributed cache file.
+          // Create this file on the pseudo local FS on the fly (i.e. when the
+          // simulated job is submitted).
+          continue;
+        }
+        // distributed cache file on hdfs
+        String mappedPath = mapDistCacheFilePath(files[i], timeStamps[i],
+                                                 visibility, user);
+
+        // No need to add a distributed cache file path to the list if
+        // (1) the mapped path is already there in the list OR
+        // (2) the file with the mapped path already exists.
+        // In any of the above 2 cases, file paths, timestamps, file sizes and
+        // visibilities match. File sizes should match if file paths and
+        // timestamps match because single file path with single timestamp
+        // should correspond to a single file size.
+        if (distCacheFiles.containsKey(mappedPath) ||
+            fs.exists(new Path(mappedPath))) {
+          continue;
+        }
+        distCacheFiles.put(mappedPath, Long.valueOf(fileSizes[i]));
+      }
+    }
+  }
+
+  /**
+   * Check if the file path provided was constructed by MapReduce for a
+   * distributed cache file on local file system.
+   * @param filePath path of the distributed cache file
+   * @param user job submitter of the job for which &lt;filePath&gt; is a
+   *             distributed cache file
+   * @param visibility <code>true</code> for public distributed cache file
+   * @return true if the path provided is of a local file system based
+   *              distributed cache file
+   */
+  private boolean isLocalDistCacheFile(String filePath, String user,
+                                       boolean visibility) {
+    return (!visibility && filePath.contains(user + "/.staging"));
+  }
+
+  /**
+   * Map the HDFS based distributed cache file path from original cluster to
+   * a unique file name on the simulated cluster.
+   * <br> Unique  distributed file names on simulated cluster are generated
+   * using original cluster's <li>file path, <li>timestamp and <li> the
+   * job-submitter for private distributed cache file.
+   * <br> This implies that if on original cluster, a single HDFS file
+   * considered as two private distributed cache files for two jobs of
+   * different users, then the corresponding simulated jobs will have two
+   * different files of the same size in public distributed cache, one for each
+   * user. Both these simulated jobs will not share these distributed cache
+   * files, thus leading to the same load as seen in the original cluster.
+   * @param file distributed cache file path
+   * @param timeStamp time stamp of dist cachce file
+   * @param isPublic true if this distributed cache file is a public
+   *                 distributed cache file
+   * @param user job submitter on original cluster
+   * @return the mapped path on simulated cluster
+   */
+  private String mapDistCacheFilePath(String file, String timeStamp,
+      boolean isPublic, String user) {
+    String id = file + timeStamp;
+    if (!isPublic) {
+      // consider job-submitter for private distributed cache file
+      id = id.concat(user);
+    }
+    return new Path(distCachePath, MD5Hash.digest(id).toString()).toUri()
+               .getPath();
+  }
+
+  /**
+   * Write the list of distributed cache files in the decreasing order of
+   * file sizes into the sequence file. This file will be input to the job
+   * {@link GenerateDistCacheData}.
+   * Also validates if -generate option is missing and distributed cache files
+   * are missing.
+   * @return exit code
+   * @throws IOException
+   */
+  private int writeDistCacheFilesList()
+      throws IOException {
+    // Sort the distributed cache files in the decreasing order of file sizes.
+    List dcFiles = new ArrayList(distCacheFiles.entrySet());
+    Collections.sort(dcFiles, new Comparator() {
+      public int compare(Object dc1, Object dc2) {
+        return ((Comparable) ((Map.Entry) (dc2)).getValue())
+            .compareTo(((Map.Entry) (dc1)).getValue());
+      }
+    });
+
+    // write the sorted distributed cache files to the sequence file
+    FileSystem fs = FileSystem.get(conf);
+    Path distCacheFilesList = new Path(distCachePath, "_distCacheFiles.txt");
+    conf.set(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST,
+        distCacheFilesList.toString());
+    SequenceFile.Writer src_writer = SequenceFile.createWriter(fs, conf,
+        distCacheFilesList, LongWritable.class, BytesWritable.class,
+        SequenceFile.CompressionType.NONE);
+
+    // Total number of unique distributed cache files
+    int fileCount = dcFiles.size();
+    long byteCount = 0;// Total size of all distributed cache files
+    long bytesSync = 0;// Bytes after previous sync;used to add sync marker
+
+    for (Iterator it = dcFiles.iterator(); it.hasNext();) {
+      Map.Entry entry = (Map.Entry)it.next();
+      LongWritable fileSize =
+          new LongWritable(Long.valueOf(entry.getValue().toString()));
+      BytesWritable filePath =
+          new BytesWritable(entry.getKey().toString().getBytes());
+
+      byteCount += fileSize.get();
+      bytesSync += fileSize.get();
+      if (bytesSync > AVG_BYTES_PER_MAP) {
+        src_writer.sync();
+        bytesSync = fileSize.get();
+      }
+      src_writer.append(fileSize, filePath);
+    }
+    if (src_writer != null) {
+      src_writer.close();
+    }
+    // Set delete on exit for 'dist cache files list' as it is not needed later.
+    fs.deleteOnExit(distCacheFilesList);
+
+    conf.setInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, fileCount);
+    conf.setLong(GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, byteCount);
+    LOG.info("Number of HDFS based distributed cache files to be generated is"
+        + fileCount + ". Total size of HDFS based distributed cache files "
+        + "to be generated is " + byteCount);
+
+    if (!shouldGenerateDistCacheData() && fileCount > 0) {
+      LOG.error("Missing " + fileCount + " distributed cache files under the "
+          + " directory\n" + distCachePath + "\nthat are needed for gridmix"
+          + " to emulate distributed cache load. Either use -generate\noption"
+          + " to generate distributed cache data along with input data OR "
+          + "disable\ndistributed cache emulation by configuring '"
+          + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+          + "' to false.");
+      return MISSING_DIST_CACHE_FILES_ERROR;
+    }
+    return 0;
+  }
+
+  /**
+   * If gridmix needs to emulate distributed cache load, then configure
+   * distributed cache files of a simulated job by mapping the original
+   * cluster's distributed cache file paths to the simulated cluster's paths and
+   * setting these mapped paths in the job configuration of the simulated job.
+   * <br>
+   * Configure local FS based distributed cache files through the property
+   * "tmpfiles" and hdfs based distributed cache files through the property
+   * {@link MRJobConfig#CACHE_FILES}.
+   * @param conf configuration for the simulated job to be run
+   * @param jobConf job configuration of original cluster's job, obtained from
+   *                trace
+   * @throws IOException
+   */
+  void configureDistCacheFiles(Configuration conf, JobConf jobConf)
+      throws IOException {
+    if (shouldEmulateDistCacheLoad()) {
+
+      String[] files = jobConf.getStrings(MRJobConfig.CACHE_FILES);
+      if (files != null) {
+        // hdfs based distributed cache files to be configured for simulated job
+        List<String> cacheFiles = new ArrayList<String>();
+        // local FS based distributed cache files to be configured for
+        // simulated job
+        List<String> localCacheFiles = new ArrayList<String>();
+
+        String[] visibilities =
+          jobConf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES);
+        String[] timeStamps =
+          jobConf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+        String[] fileSizes = jobConf.getStrings(MRJobConfig.CACHE_FILES_SIZES);
+
+        String user = jobConf.getUser();
+        for (int i = 0; i < files.length; i++) {
+          // Check if visibilities are available because older hadoop versions
+          // didn't have public, private Distributed Caches separately.
+          boolean visibility =
+            (visibilities == null) ? true : Boolean.valueOf(visibilities[i]);
+          if (isLocalDistCacheFile(files[i], user, visibility)) {
+            // local FS based distributed cache file.
+            // Create this file on the pseudo local FS.
+            String fileId = MD5Hash.digest(files[i] + timeStamps[i]).toString();
+            long fileSize = Long.valueOf(fileSizes[i]);
+            Path mappedLocalFilePath =
+                PseudoLocalFs.generateFilePath(fileId, fileSize)
+                    .makeQualified(pseudoLocalFs.getUri(),
+                                   pseudoLocalFs.getWorkingDirectory());
+            pseudoLocalFs.create(mappedLocalFilePath);
+            localCacheFiles.add(mappedLocalFilePath.toUri().toString());
+          } else {
+            // hdfs based distributed cache file.
+            // Get the mapped HDFS path on simulated cluster
+            String mappedPath = mapDistCacheFilePath(files[i], timeStamps[i],
+                                                     visibility, user);
+            cacheFiles.add(mappedPath);
+          }
+        }
+        if (cacheFiles.size() > 0) {
+          // configure hdfs based distributed cache files for simulated job
+          conf.setStrings(MRJobConfig.CACHE_FILES,
+                          cacheFiles.toArray(new String[cacheFiles.size()]));
+        }
+        if (localCacheFiles.size() > 0) {
+          // configure local FS based distributed cache files for simulated job
+          conf.setStrings("tmpfiles", localCacheFiles.toArray(
+                                        new String[localCacheFiles.size()]));
+        }
+      }
+    }
+  }
+}
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
index a1bdc44..066cdf1 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
@@ -85,10 +85,11 @@
    * Replication of generated data.
    */
   public static final String GRIDMIX_GEN_REPLICATION = "gridmix.gen.replicas";
+  static final String JOB_NAME = "GRIDMIX_GENERATE_INPUT_DATA";
 
   public GenerateData(Configuration conf, Path outdir, long genbytes)
       throws IOException {
-    super(conf, 0L, "GRIDMIX_GENDATA");
+    super(conf, 0L, JOB_NAME);
     job.getConfiguration().setLong(GRIDMIX_GEN_BYTES, genbytes);
     FileOutputFormat.setOutputPath(job, outdir);
   }
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
new file mode 100644
index 0000000..f5acc9a
--- /dev/null
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * GridmixJob that generates distributed cache files.
+ * {@link GenerateDistCacheData} expects a list of distributed cache files to be
+ * generated as input. This list is expected to be stored as a sequence file
+ * and the filename is expected to be configured using
+ * {@code gridmix.distcache.file.list}.
+ * This input file contains the list of distributed cache files and their sizes.
+ * For each record (i.e. file size and file path) in this input file,
+ * a file with the specific file size at the specific path is created.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class GenerateDistCacheData extends GridmixJob {
+
+  /**
+   * Number of distributed cache files to be created by gridmix
+   */
+  static final String GRIDMIX_DISTCACHE_FILE_COUNT =
+      "gridmix.distcache.file.count";
+  /**
+   * Total number of bytes to be written to the distributed cache files by
+   * gridmix. i.e. Sum of sizes of all unique distributed cache files to be
+   * created by gridmix.
+   */
+  static final String GRIDMIX_DISTCACHE_BYTE_COUNT =
+      "gridmix.distcache.byte.count";
+  /**
+   * The special file created(and used) by gridmix, that contains the list of
+   * unique distributed cache files that are to be created and their sizes.
+   */
+  static final String GRIDMIX_DISTCACHE_FILE_LIST =
+      "gridmix.distcache.file.list";
+  static final String JOB_NAME = "GRIDMIX_GENERATE_DISTCACHE_DATA";
+
+  public GenerateDistCacheData(Configuration conf) throws IOException {
+    super(conf, 0L, JOB_NAME);
+  }
+
+  @Override
+  public Job call() throws IOException, InterruptedException,
+                           ClassNotFoundException {
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+    ugi.doAs( new PrivilegedExceptionAction <Job>() {
+       public Job run() throws IOException, ClassNotFoundException,
+                               InterruptedException {
+        job.setMapperClass(GenDCDataMapper.class);
+        job.setNumReduceTasks(0);
+        job.setMapOutputKeyClass(NullWritable.class);
+        job.setMapOutputValueClass(BytesWritable.class);
+        job.setInputFormatClass(GenDCDataFormat.class);
+        job.setOutputFormatClass(NullOutputFormat.class);
+        job.setJarByClass(GenerateDistCacheData.class);
+        try {
+          FileInputFormat.addInputPath(job, new Path("ignored"));
+        } catch (IOException e) {
+          LOG.error("Error while adding input path ", e);
+        }
+        job.submit();
+        return job;
+      }
+    });
+    return job;
+  }
+
+  public static class GenDCDataMapper
+      extends Mapper<LongWritable, BytesWritable, NullWritable, BytesWritable> {
+
+    private BytesWritable val;
+    private final Random r = new Random();
+    private FileSystem fs;
+
+    @Override
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      val = new BytesWritable(new byte[context.getConfiguration().getInt(
+              GenerateData.GRIDMIX_VAL_BYTES, 1024 * 1024)]);
+      fs = FileSystem.get(context.getConfiguration());
+    }
+
+    // Create one distributed cache file with the needed file size.
+    // key is distributed cache file size and
+    // value is distributed cache file path.
+    @Override
+    public void map(LongWritable key, BytesWritable value, Context context)
+        throws IOException, InterruptedException {
+
+      String fileName = new String(value.getBytes(), 0, value.getLength());
+      Path path = new Path(fileName);
+
+      /**
+       * Create distributed cache file with the permissions 0755.
+       * Since the private distributed cache directory doesn't have execute
+       * permission for others, it is OK to set read permission for others for
+       * the files under that directory and still they will become 'private'
+       * distributed cache files on the simulated cluster.
+       */
+      FSDataOutputStream dos =
+          FileSystem.create(fs, path, new FsPermission((short)0755));
+
+      for (long bytes = key.get(); bytes > 0; bytes -= val.getLength()) {
+        r.nextBytes(val.getBytes());
+        val.setSize((int)Math.min(val.getLength(), bytes));
+        dos.write(val.getBytes(), 0, val.getLength());// Write to distCache file
+      }
+      dos.close();
+    }
+  }
+
+  /**
+   * InputFormat for GenerateDistCacheData.
+   * Input to GenerateDistCacheData is the special file(in SequenceFile format)
+   * that contains the list of distributed cache files to be generated along
+   * with their file sizes.
+   */
+  static class GenDCDataFormat
+      extends InputFormat<LongWritable, BytesWritable> {
+
+    // Split the special file that contains the list of distributed cache file
+    // paths and their file sizes such that each split corresponds to
+    // approximately same amount of distributed cache data to be generated.
+    // Consider numTaskTrackers * numMapSlotsPerTracker as the number of maps
+    // for this job, if there is lot of data to be generated.
+    @Override
+    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+      final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
+      final JobClient client = new JobClient(jobConf);
+      ClusterStatus stat = client.getClusterStatus(true);
+      int numTrackers = stat.getTaskTrackers();
+      final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);
+
+      // Total size of distributed cache files to be generated
+      final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
+      // Get the path of the special file
+      String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
+      if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
+        throw new RuntimeException("Invalid metadata: #files (" + fileCount
+            + "), total_size (" + totalSize + "), filelisturi ("
+            + distCacheFileList + ")");
+      }
+
+      Path sequenceFile = new Path(distCacheFileList);
+      FileSystem fs = sequenceFile.getFileSystem(jobConf);
+      FileStatus srcst = fs.getFileStatus(sequenceFile);
+      // Consider the number of TTs * mapSlotsPerTracker as number of mappers.
+      int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2);
+      int numSplits = numTrackers * numMapSlotsPerTracker;
+
+      List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+      LongWritable key = new LongWritable();
+      BytesWritable value = new BytesWritable();
+
+      // Average size of data to be generated by each map task
+      final long targetSize = Math.max(totalSize / numSplits,
+                                DistributedCacheEmulator.AVG_BYTES_PER_MAP);
+      long splitStartPosition = 0L;
+      long splitEndPosition = 0L;
+      long acc = 0L;
+      long bytesRemaining = srcst.getLen();
+      SequenceFile.Reader reader = null;
+      try {
+        reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
+        while (reader.next(key, value)) {
+
+          // If adding this file would put this split past the target size,
+          // cut the last split and put this file in the next split.
+          if (acc + key.get() > targetSize && acc != 0) {
+            long splitSize = splitEndPosition - splitStartPosition;
+            splits.add(new FileSplit(
+                sequenceFile, splitStartPosition, splitSize, (String[])null));
+            bytesRemaining -= splitSize;
+            splitStartPosition = splitEndPosition;
+            acc = 0L;
+          }
+          acc += key.get();
+          splitEndPosition = reader.getPosition();
+        }
+      } finally {
+        if (reader != null) {
+          reader.close();
+        }
+      }
+      if (bytesRemaining != 0) {
+        splits.add(new FileSplit(
+            sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
+      }
+
+      return splits;
+    }
+
+    /**
+     * Returns a reader for this split of the distributed cache file list.
+     */
+    @Override
+    public RecordReader<LongWritable, BytesWritable> createRecordReader(
+        InputSplit split, final TaskAttemptContext taskContext)
+        throws IOException, InterruptedException {
+      return new SequenceFileRecordReader<LongWritable, BytesWritable>();
+    }
+  }
+}
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
index 51f10f4..48ae52c 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
@@ -92,6 +92,8 @@
    */
   public static final String GRIDMIX_USR_RSV = "gridmix.user.resolve.class";
 
+  private DistributedCacheEmulator distCacheEmulator;
+
   // Submit data structures
   private JobFactory factory;
   private JobSubmitter submitter;
@@ -102,36 +104,61 @@
   private final Shutdown sdh = new Shutdown();
 
   /**
-   * Write random bytes at the path provided.
+   * Write random bytes at the path &lt;ioPath&gt;/input/
    * @see org.apache.hadoop.mapred.gridmix.GenerateData
    */
   protected void writeInputData(long genbytes, Path ioPath)
       throws IOException, InterruptedException {
+    Path inputDir = new Path(ioPath, "input");
     final Configuration conf = getConf();
-    final GridmixJob genData = new GenerateData(conf, ioPath, genbytes);
-    submitter.add(genData);
+    final GridmixJob genData = new GenerateData(conf, inputDir, genbytes);
     LOG.info("Generating " + StringUtils.humanReadableInt(genbytes) +
         " of test data...");
-    // TODO add listeners, use for job dependencies
-    TimeUnit.SECONDS.sleep(10);
-    try {
-      genData.getJob().waitForCompletion(false);
-    } catch (ClassNotFoundException e) {
-      throw new IOException("Internal error", e);
-    }
-    if (!genData.getJob().isSuccessful()) {
-      throw new IOException("Data generation failed!");
-    }
-
+    launchGridmixJob(genData);
+    
     FsShell shell = new FsShell(conf);
     try {
-      LOG.info("Changing the permissions for inputPath " + ioPath.toString());
-      shell.run(new String[] {"-chmod","-R","777", ioPath.toString()});
+      LOG.info("Changing the permissions for inputPath " + inputDir.toString());
+      shell.run(new String[] {"-chmod","-R","777", inputDir.toString()});
     } catch (Exception e) {
       LOG.error("Couldnt change the file permissions " , e);
       throw new IOException(e);
     }
-    LOG.info("Done.");
+  }
+
+  /**
+   * Write random bytes in the distributed cache files that will be used by all
+   * simulated jobs of current gridmix run, if files are to be generated.
+   * Do this as part of the MapReduce job {@link GenerateDistCacheData#JOB_NAME}
+   * @see org.apache.hadoop.mapred.gridmix.GenerateDistCacheData
+   */
+  protected void writeDistCacheData(Configuration conf)
+      throws IOException, InterruptedException {
+    int fileCount =
+        conf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1);
+    if (fileCount > 0) {// generate distributed cache files
+      final GridmixJob genDistCacheData = new GenerateDistCacheData(conf);
+      LOG.info("Generating distributed cache data of size " + conf.getLong(
+          GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1));
+      launchGridmixJob(genDistCacheData);
+    }
+  }
+
+  // Launch Input/DistCache Data Generation job and wait for completion
+  void launchGridmixJob(GridmixJob job)
+      throws IOException, InterruptedException {
+    submitter.add(job);
+
+    // TODO add listeners, use for job dependencies
+    TimeUnit.SECONDS.sleep(10);
+    try {
+      job.getJob().waitForCompletion(false);
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Internal error", e);
+    }
+    if (!job.getJob().isSuccessful()) {
+      throw new IOException(job.getJob().getJobName() + " job failed!");
+    }
   }
 
   /**
@@ -158,7 +185,9 @@
    * @param conf Configuration data, no keys specific to this context
    * @param traceIn Either a Path to the trace data or &quot;-&quot; for
    *                stdin
-   * @param ioPath Path from which input data is read
+   * @param ioPath &lt;ioPath&gt;/input/ is the dir from which input data is
+   *               read and &lt;ioPath&gt;/distributedCache/ is the gridmix
+   *               distributed cache directory.
    * @param scratchDir Path into which job output is written
    * @param startFlag Semaphore for starting job trace pipeline
    */
@@ -166,6 +195,7 @@
       Path scratchDir, CountDownLatch startFlag, UserResolver userResolver)
       throws IOException {
     try {
+      Path inputDir = new Path(ioPath, "input");
       GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
         conf, GridmixJobSubmissionPolicy.STRESS);
       LOG.info(" Submission policy is " + policy.name());
@@ -179,11 +209,14 @@
       int numThreads = conf.getInt(GRIDMIX_SUB_THR, noOfSubmitterThreads);
       int queueDep = conf.getInt(GRIDMIX_QUE_DEP, 5);
       submitter = createJobSubmitter(monitor, numThreads, queueDep,
-                                     new FilePool(conf, ioPath), userResolver, 
+                                     new FilePool(conf, inputDir), userResolver, 
                                      statistics);
-      
+      distCacheEmulator = new DistributedCacheEmulator(conf, ioPath);
+
       factory = createJobFactory(submitter, traceIn, scratchDir, conf, 
                                  startFlag, userResolver);
+      factory.jobCreator.setDistCacheEmulator(distCacheEmulator);
+
       if (policy == GridmixJobSubmissionPolicy.SERIAL) {
         statistics.addJobStatsListeners(factory);
       } else {
@@ -244,6 +277,9 @@
       printUsage(System.err);
       return 1;
     }
+    
+    // Should gridmix generate distributed cache data ?
+    boolean generate = false;
     long genbytes = -1L;
     String traceIn = null;
     Path ioPath = null;
@@ -257,6 +293,7 @@
       for (int i = 0; i < argv.length - 2; ++i) {
         if ("-generate".equals(argv[i])) {
           genbytes = StringUtils.TraditionalBinaryPrefix.string2long(argv[++i]);
+          generate = true;
         } else if ("-users".equals(argv[i])) {
           userRsrc = new URI(argv[++i]);
         } else {
@@ -287,12 +324,33 @@
       printUsage(System.err);
       return 1;
     }
-    return start(conf, traceIn, ioPath, genbytes, userResolver);
+    return start(conf, traceIn, ioPath, genbytes, userResolver, generate);
   }
 
+  /**
+   * 
+   * @param conf gridmix configuration
+   * @param traceIn trace file path(if it is '-', then trace comes from the
+   *                stream stdin)
+   * @param ioPath Working directory for gridmix. GenerateData job
+   *               will generate data in the directory &lt;ioPath&gt;/input/ and
+   *               distributed cache data is generated in the directory
+   *               &lt;ioPath&gt;/distributedCache/, if -generate option is
+   *               specified.
+   * @param genbytes size of input data to be generated under the directory
+   *                 &lt;ioPath&gt;/input/
+   * @param userResolver gridmix user resolver
+   * @param generate true if -generate option was specified
+   * @return exit code
+   * @throws IOException
+   * @throws InterruptedException
+   */
   int start(Configuration conf, String traceIn, Path ioPath, long genbytes,
-      UserResolver userResolver) throws IOException, InterruptedException {
+      UserResolver userResolver, boolean generate)
+      throws IOException, InterruptedException {
     InputStream trace = null;
+    ioPath = ioPath.makeQualified(ioPath.getFileSystem(conf));
+
     try {
       Path scratchDir = new Path(ioPath, conf.get(GRIDMIX_OUT_DIR, "gridmix"));
 
@@ -310,10 +368,12 @@
         // scan input dir contents
         submitter.refreshFilePool();
 
-        // create scratch directory(output path of gridmix)
-        final FileSystem scratchFs = scratchDir.getFileSystem(conf);
-        FileSystem.mkdirs(scratchFs, scratchDir,
-            new FsPermission((short) 0777));
+        // set up the needed things for emulation of various loads
+        int exitCode = setupEmulation(conf, traceIn, scratchDir, ioPath,
+                                      generate);
+        if (exitCode != 0) {
+          return exitCode;
+        }
 
         factory.start();
         statistics.start();
@@ -350,6 +410,64 @@
   }
 
   /**
+   * Create gridmix output directory. Setup things for emulation of
+   * various loads, if needed.
+   * @param conf gridmix configuration
+   * @param traceIn trace file path(if it is '-', then trace comes from the
+   *                stream stdin)
+   * @param scratchDir gridmix output directory
+   * @param ioPath Working directory for gridmix.
+   * @param generate true if -generate option was specified
+   * @return exit code
+   * @throws IOException
+   * @throws InterruptedException 
+   */
+  private int setupEmulation(Configuration conf, String traceIn,
+      Path scratchDir, Path ioPath, boolean generate)
+      throws IOException, InterruptedException {
+    // create scratch directory(output directory of gridmix)
+    final FileSystem scratchFs = scratchDir.getFileSystem(conf);
+    FileSystem.mkdirs(scratchFs, scratchDir, new FsPermission((short) 0777));
+
+    // Setup things needed for emulation of distributed cache load
+    return setupDistCacheEmulation(conf, traceIn, ioPath, generate);
+    // Setup emulation of other loads like CPU load, Memory load
+  }
+
+  /**
+   * Setup gridmix for emulation of distributed cache load. This includes
+   * generation of distributed cache files, if needed.
+   * @param conf gridmix configuration
+   * @param traceIn trace file path(if it is '-', then trace comes from the
+   *                stream stdin)
+   * @param ioPath &lt;ioPath&gt;/input/ is the dir where input data (a) exists
+   *               or (b) is generated. &lt;ioPath&gt;/distributedCache/ is the
+   *               folder where distributed cache data (a) exists or (b) is to be
+   *               generated by gridmix.
+   * @param generate true if -generate option was specified
+   * @return exit code
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private int setupDistCacheEmulation(Configuration conf, String traceIn,
+      Path ioPath, boolean generate) throws IOException, InterruptedException {
+    distCacheEmulator.init(traceIn, factory.jobCreator, generate);
+    int exitCode = 0;
+    if (distCacheEmulator.shouldGenerateDistCacheData() ||
+        distCacheEmulator.shouldEmulateDistCacheLoad()) {
+
+      JobStoryProducer jsp = createJobStoryProducer(traceIn, conf);
+      exitCode = distCacheEmulator.setupGenerateDistCacheData(jsp);
+      if (exitCode == 0) {
+        // If there are files to be generated, run a MapReduce job to generate
+        // these distributed cache files of all the simulated jobs of this trace.
+        writeDistCacheData(conf);
+      }
+    }
+    return exitCode;
+  }
+
+  /**
    * Handles orderly shutdown by requesting that each component in the
    * pipeline abort its progress, waiting for each to exit and killing
    * any jobs still running on the cluster.
@@ -446,6 +564,11 @@
     ToolRunner.printGenericCommandUsage(out);
     out.println("Usage: gridmix [-generate <MiB>] [-users URI] [-Dname=value ...] <iopath> <trace>");
     out.println("  e.g. gridmix -generate 100m foo -");
+    out.println("Options:");
+    out.println("   -generate <MiB> : Generate input data of size MiB under "
+        + "<iopath>/input/ and generate\n\t\t     distributed cache data under "
+        + "<iopath>/distributedCache/.");
+    out.println("   -users <usersResourceURI> : URI that contains the users list.");
     out.println("Configuration parameters:");
     out.println("   General parameters:");
     out.printf("       %-48s : Output directory\n", GRIDMIX_OUT_DIR);
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
index 52d95d9..f31e854 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
@@ -18,31 +18,42 @@
 
 package org.apache.hadoop.mapred.gridmix;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.gridmix.GenerateData.GenSplit;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.rumen.JobStory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Random;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public enum JobCreator {
 
   LOADJOB {
     @Override
     public GridmixJob createGridmixJob(
-      Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
-      UserGroupInformation ugi, int seq) throws IOException {
+      Configuration gridmixConf, long submissionMillis, JobStory jobdesc,
+      Path outRoot, UserGroupInformation ugi, int seq) throws IOException {
+
+      // Build configuration for this simulated job
+      Configuration conf = new Configuration(gridmixConf);
+      dce.configureDistCacheFiles(conf, jobdesc.getJobConf());
       return new LoadJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
-    }},
+    }
+
+    @Override
+    public boolean canEmulateDistCacheLoad() {
+      return true;
+    }
+  },
 
   SLEEPJOB {
     private String[] hosts;
@@ -72,12 +83,30 @@
       }
       return new SleepJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq,
           numLocations, hosts);
-    }};
+    }
+
+    @Override
+    public boolean canEmulateDistCacheLoad() {
+      return false;
+    }
+  };
 
   public static final String GRIDMIX_JOB_TYPE = "gridmix.job.type";
   public static final String SLEEPJOB_RANDOM_LOCATIONS = 
     "gridmix.sleep.fake-locations";
 
+  /**
+   * Create Gridmix simulated job.
+   * @param conf configuration of simulated job
+   * @param submissionMillis At what time submission of this simulated job be
+   *                         done
+   * @param jobdesc JobStory obtained from trace
+   * @param outRoot gridmix output directory
+   * @param ugi UGI of job submitter of this simulated job
+   * @param seq job sequence number
+   * @return the created simulated job
+   * @throws IOException
+   */
   public abstract GridmixJob createGridmixJob(
     final Configuration conf, long submissionMillis, final JobStory jobdesc,
     Path outRoot, UserGroupInformation ugi, final int seq) throws IOException;
@@ -86,4 +115,21 @@
     Configuration conf, JobCreator defaultPolicy) {
     return conf.getEnum(GRIDMIX_JOB_TYPE, defaultPolicy);
   }
+
+  /**
+   * @return true if gridmix simulated jobs of this job type can emulate
+   *         distributed cache load
+   */
+  abstract boolean canEmulateDistCacheLoad();
+
+  DistributedCacheEmulator dce;
+  /**
+   * This method is to be called before calling any other method in JobCreator
+   * except canEmulateDistCacheLoad(), especially if canEmulateDistCacheLoad()
+   * returns true for that job type.
+   * @param e Distributed Cache Emulator
+   */
+  void setDistCacheEmulator(DistributedCacheEmulator e) {
+    this.dce = e;
+  }
 }
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java
new file mode 100644
index 0000000..497108a
--- /dev/null
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Pseudo local file system that generates random data for any file on the fly
+ * instead of storing files on disk. So opening same file multiple times will
+ * not give same file content. There are no directories in this file system
+ * other than the root and all the files are under root i.e. "/". All file URIs
+ * on pseudo local file system should be of the format <code>
+ * pseudo:///&lt;name&gt;.&lt;fileSize&gt;</code> where name is a unique name
+ * and &lt;fileSize&gt; is a number representing the size of the file in bytes.
+ */
+class PseudoLocalFs extends FileSystem {
+  Path home;
+  /**
+   * The creation time and modification time of all files in
+   * {@link PseudoLocalFs} is same.
+   */
+  private static final long TIME = System.currentTimeMillis();
+  private static final String HOME_DIR = "/";
+  private static final long BLOCK_SIZE  = 4 * 1024 * 1024L; // 4 MB
+  private static final int DEFAULT_BUFFER_SIZE = 1024  * 1024; // 1MB
+
+  static final URI NAME = URI.create("pseudo:///");
+
+  PseudoLocalFs() {
+    this(new Path(HOME_DIR));
+  }
+
+  PseudoLocalFs(Path home) {
+    super();
+    this.home = home;
+  }
+
+  @Override
+  public URI getUri() {
+    return NAME;
+  }
+
+  @Override
+  public Path getHomeDirectory() {
+    return home;
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return getHomeDirectory();
+  }
+
+  /**
+   * Generates a valid pseudo local file path from the given <code>fileId</code>
+   * and <code>fileSize</code>.
+   * @param fileId unique file id string
+   * @param fileSize file size
+   * @return the generated relative path
+   */
+  static Path generateFilePath(String fileId, long fileSize) {
+    return new Path(fileId + "." + fileSize);
+  }
+
+  /**
+   * Creating a pseudo local file is nothing but validating the file path.
+   * Actual data of the file is generated on the fly when client tries to open
+   * the file for reading.
+   * @param path file path to be created
+   */
+  @Override
+  public FSDataOutputStream create(Path path) throws IOException {
+    try {
+      validateFileNameFormat(path);
+    } catch (FileNotFoundException e) {
+      throw new IOException("File creation failed for " + path);
+    }
+    return null;
+  }
+
+  /**
+   * Validate if the path provided is of expected format of Pseudo Local File
+   * System based files.
+   * @param path file path
+   * @return the file size
+   * @throws FileNotFoundException
+   */
+  long validateFileNameFormat(Path path) throws FileNotFoundException {
+    path = path.makeQualified(this);
+    boolean valid = true;
+    long fileSize = 0;
+    if (!path.toUri().getScheme().equals(getUri().getScheme())) {
+      valid = false;
+    } else {
+      String[] parts = path.toUri().getPath().split("\\.");
+      try {
+        fileSize = Long.valueOf(parts[parts.length - 1]);
+        valid = (fileSize >= 0);
+      } catch (NumberFormatException e) {
+        valid = false;
+      }
+    }
+    if (!valid) {
+      throw new FileNotFoundException("File " + path
+          + " does not exist in pseudo local file system");
+    }
+    return fileSize;
+  }
+
+  /**
+   * @See create(Path) for details
+   */
+  @Override
+  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+    long fileSize = validateFileNameFormat(path);
+    InputStream in = new RandomInputStream(fileSize, bufferSize);
+    return new FSDataInputStream(in);
+  }
+
+  /**
+   * @See create(Path) for details
+   */
+  @Override
+  public FSDataInputStream open(Path path) throws IOException {
+    return open(path, DEFAULT_BUFFER_SIZE);
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path path) throws IOException {
+    long fileSize = validateFileNameFormat(path);
+    return new FileStatus(fileSize, false, 1, BLOCK_SIZE, TIME, path);
+  }
+
+  @Override
+  public boolean exists(Path path) {
+    try{
+      validateFileNameFormat(path);
+    } catch (FileNotFoundException e) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public FSDataOutputStream create(Path path, FsPermission permission,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+    return create(path);
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path path) throws FileNotFoundException,
+      IOException {
+    return new FileStatus[] {getFileStatus(path)};
+  }
+
+  /**
+   * Input Stream that generates specified number of random bytes.
+   */
+  static class RandomInputStream extends InputStream
+      implements Seekable, PositionedReadable {
+
+    private final Random r = new Random();
+    private BytesWritable val = null;
+    private int positionInVal = 0;// current position in the buffer 'val'
+
+    private long totalSize = 0;// total number of random bytes to be generated
+    private long curPos = 0;// current position in this stream
+
+    /**
+     * @param size total number of random bytes to be generated in this stream
+     * @param bufferSize the buffer size. An internal buffer array of length
+     * <code>bufferSize</code> is created. If <code>bufferSize</code> is not a
+     * positive number, then a default value of 1MB is used.
+     */
+    RandomInputStream(long size, int bufferSize) {
+      totalSize = size;
+      if (bufferSize <= 0) {
+        bufferSize = DEFAULT_BUFFER_SIZE;
+      }
+      val = new BytesWritable(new byte[bufferSize]);
+    }
+
+    @Override
+    public int read() throws IOException {
+      byte[] b = new byte[1];
+      if (curPos < totalSize) {
+        if (positionInVal < val.getLength()) {// use buffered byte
+          b[0] = val.getBytes()[positionInVal++];
+          ++curPos;
+        } else {// generate data
+          int num = read(b);
+          if (num < 0) {
+            return num;
+          }
+        }
+      } else {
+        return -1;
+      }
+      return b[0];
+    }
+
+    @Override
+    public int read(byte[] bytes) throws IOException {
+      return read(bytes, 0, bytes.length);
+    }
+
+    @Override
+    public int read(byte[] bytes, int off, int len) throws IOException {
+      if (curPos == totalSize) {
+        return -1;// EOF
+      }
+      int numBytes = len;
+      if (numBytes > (totalSize - curPos)) {// position in file is close to EOF
+        numBytes = (int)(totalSize - curPos);
+      }
+      if (numBytes > (val.getLength() - positionInVal)) {
+        // need to generate data into val
+        r.nextBytes(val.getBytes());
+        positionInVal = 0;
+      }
+
+      System.arraycopy(val.getBytes(), positionInVal, bytes, off, numBytes);
+      curPos += numBytes;
+      positionInVal += numBytes;
+      return numBytes;
+    }
+
+    @Override
+    public int available() {
+      return (int)(val.getLength() - positionInVal);
+    }
+
+    @Override
+    public int read(long position, byte[] buffer, int offset, int length)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer, int offset, int length)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Get the current position in this stream/pseudo-file
+     * @return the position in this stream/pseudo-file
+     * @throws IOException
+     */
+    @Override
+    public long getPos() throws IOException {
+      return curPos;
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public FSDataOutputStream append(Path path, int bufferSize,
+      Progressable progress) throws IOException {
+    throw new UnsupportedOperationException("Append is not supported"
+        + " in pseudo local file system.");
+  }
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    throw new UnsupportedOperationException("Mkdirs is not supported"
+        + " in pseudo local file system.");
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    throw new UnsupportedOperationException("Rename is not supported"
+        + " in pseudo local file system.");
+  }
+
+  @Override
+  public boolean delete(Path path, boolean recursive) {
+    throw new UnsupportedOperationException("File deletion is not supported "
+        + "in pseudo local file system.");
+  }
+
+  @Override
+  public void setWorkingDirectory(Path newDir) {
+    throw new UnsupportedOperationException("SetWorkingDirectory "
+        + "is not supported in pseudo local file system.");
+  }
+}
diff --git a/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
index 7971270..c1e433c 100644
--- a/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
+++ b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
@@ -28,6 +28,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.InputSplit;
 
@@ -209,9 +210,14 @@
 
    @Override
    public String getUser() {
-     String s = String.format("foobar%d", id);
-     GridmixTestUtils.createHomeAndStagingDirectory(s, (JobConf)conf);
-     return s;
+     // Obtain user name from job configuration, if available.
+     // Otherwise use dummy user names.
+     String user = conf.get(MRJobConfig.USER_NAME);
+     if (user == null) {
+       user = String.format("foobar%d", id);
+     }
+     GridmixTestUtils.createHomeAndStagingDirectory(user, (JobConf)conf);
+     return user;
    }
 
    @Override
@@ -289,7 +295,7 @@
 
     @Override
     public org.apache.hadoop.mapred.JobConf getJobConf() {
-      throw new UnsupportedOperationException();
+      return new JobConf(conf);
     }
 
     @Override
diff --git a/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java
new file mode 100644
index 0000000..163cc80
--- /dev/null
+++ b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java
@@ -0,0 +1,513 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import static org.junit.Assert.*;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Validate emulation of distributed cache load in gridmix simulated jobs.
+ *
+ */
+public class TestDistCacheEmulation {
+
+  private DistributedCacheEmulator dce = null;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    GridmixTestUtils.initCluster();
+  }
+
+  @AfterClass
+  public static void shutDown() throws IOException {
+    GridmixTestUtils.shutdownCluster();
+  }
+
+  /**
+   * Validate the dist cache files generated by GenerateDistCacheData job.
+   * @param jobConf configuration of GenerateDistCacheData job.
+   * @param sortedFileSizes array of sorted distributed cache file sizes 
+   * @throws IOException 
+   * @throws FileNotFoundException 
+   */
+  private void validateDistCacheData(JobConf jobConf, long[] sortedFileSizes)
+      throws FileNotFoundException, IOException {
+    Path distCachePath = dce.getDistributedCacheDir();
+    String filesListFile =
+        jobConf.get(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST);
+    FileSystem fs = FileSystem.get(jobConf);
+
+    // Validate the existence of Distributed Cache files list file directly
+    // under distributed cache directory
+    Path listFile = new Path(filesListFile);
+    assertTrue("Path of Distributed Cache files list file is wrong.",
+        distCachePath.equals(listFile.getParent().makeQualified(fs)));
+
+    // Delete the dist cache files list file
+    assertTrue("Failed to delete distributed Cache files list file " + listFile,
+               fs.delete(listFile));
+
+    List<Long> fileSizes = new ArrayList<Long>();
+    for (long size : sortedFileSizes) {
+      fileSizes.add(size);
+    }
+    // validate dist cache files after deleting the 'files list file'
+    validateDistCacheFiles(fileSizes, distCachePath);
+  }
+
+  /**
+   * Validate private/public distributed cache files.
+   * @param filesSizesExpected list of sizes of expected dist cache files
+   * @param distCacheDir the distributed cache dir to be validated
+   * @throws IOException 
+   * @throws FileNotFoundException 
+   */
+  private void validateDistCacheFiles(List filesSizesExpected,
+      Path distCacheDir) throws FileNotFoundException, IOException {
+    RemoteIterator<LocatedFileStatus> iter =
+        GridmixTestUtils.dfs.listFiles(distCacheDir, false);
+    int numFiles = filesSizesExpected.size();
+    for (int i = 0; i < numFiles; i++) {
+      assertTrue("Missing distributed cache files.", iter.hasNext());
+      LocatedFileStatus stat = iter.next();
+      assertTrue("File size of distributed cache file "
+          + stat.getPath().toUri().getPath() + " is wrong.",
+          filesSizesExpected.remove(stat.getLen()));
+
+      FsPermission perm = stat.getPermission();
+      assertEquals("Wrong permissions for distributed cache file "
+          + stat.getPath().toUri().getPath(),
+          new FsPermission((short)0644), perm);
+    }
+    assertFalse("Number of files under distributed cache dir is wrong.",
+        iter.hasNext());
+  }
+
+  /**
+   * Configures 5 HDFS-based dist cache files and 1 local-FS-based dist cache
+   * file in the given Configuration object <code>conf</code>.
+   * @param conf configuration where dist cache config properties are to be set
+   * @param useOldProperties <code>true</code> if old config properties are to
+   *                         be set
+   * @return array of sorted HDFS-based distributed cache file sizes
+   * @throws IOException
+   */
+  private long[] configureDummyDistCacheFiles(Configuration conf,
+      boolean useOldProperties) throws IOException {
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    conf.set(MRJobConfig.USER_NAME, user);
+    // Set some dummy dist cache files in gridmix configuration so that they go
+    // into the configuration of JobStory objects.
+    String[] distCacheFiles = {"hdfs:///tmp/file1.txt",
+                               "/tmp/" + user + "/.staging/job_1/file2.txt",
+                               "hdfs:///user/user1/file3.txt",
+                               "/home/user2/file4.txt",
+                               "subdir1/file5.txt",
+                               "subdir2/file6.gz"};
+    String[] fileSizes = {"400", "2500", "700", "1200", "1500", "500"};
+
+    String[] visibilities = {"true", "false", "false", "true", "true", "false"};
+    String[] timeStamps = {"1234", "2345", "34567", "5434", "125", "134"};
+    if (useOldProperties) {
+      conf.setStrings("mapred.cache.files", distCacheFiles);
+      conf.setStrings("mapred.cache.files.filesizes", fileSizes);
+      conf.setStrings("mapred.cache.files.visibilities", visibilities);
+      conf.setStrings("mapred.cache.files.timestamps", timeStamps);
+    } else {
+      conf.setStrings(MRJobConfig.CACHE_FILES, distCacheFiles);
+      conf.setStrings(MRJobConfig.CACHE_FILES_SIZES, fileSizes);
+      conf.setStrings(MRJobConfig.CACHE_FILE_VISIBILITIES, visibilities);
+      conf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, timeStamps);
+    }
+    // local FS based dist cache file whose path contains <user>/.staging is
+    // not created on HDFS. So file size 2500 is not added to sortedFileSizes.
+    long[] sortedFileSizes = new long[] {1500, 1200, 700, 500, 400};
+    return sortedFileSizes;
+  }
+
+  /**
+   * Runs setupGenerateDistCacheData() on a new DistrbutedCacheEmulator and
+   * and returns the jobConf. Fills the array <code>sortedFileSizes</code> that
+   * can be used for validation.
+   * Validation of exit code from setupGenerateDistCacheData() is done.
+   * @param generate true if -generate option is specified
+   * @param sortedFileSizes sorted HDFS-based distributed cache file sizes
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private JobConf runSetupGenerateDistCacheData(boolean generate,
+      long[] sortedFileSizes) throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    long[] fileSizes = configureDummyDistCacheFiles(conf, false);
+    System.arraycopy(fileSizes, 0, sortedFileSizes, 0, fileSizes.length);
+
+    // Job stories of all 3 jobs will have same dist cache files in their
+    // configurations
+    final int numJobs = 3;
+    DebugJobProducer jobProducer = new DebugJobProducer(numJobs, conf);
+
+    JobConf jobConf =
+        GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+    Path ioPath = new Path("testSetupGenerateDistCacheData")
+                    .makeQualified(GridmixTestUtils.dfs);
+    FileSystem fs = FileSystem.get(jobConf);
+    if (fs.exists(ioPath)) {
+      fs.delete(ioPath, true);
+    }
+    FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777));
+
+    dce = createDistributedCacheEmulator(jobConf, ioPath, generate);
+    int exitCode = dce.setupGenerateDistCacheData(jobProducer);
+    int expectedExitCode = generate ? 0 : dce.MISSING_DIST_CACHE_FILES_ERROR;
+    assertEquals("setupGenerateDistCacheData failed.",
+                 expectedExitCode, exitCode);
+
+    // reset back
+    resetDistCacheConfigProperties(jobConf);
+    return jobConf;
+  }
+
+  /**
+   * Reset the config properties related to Distributed Cache in the given
+   * job configuration <code>jobConf</code>.
+   * @param jobConf job configuration
+   */
+  private void resetDistCacheConfigProperties(JobConf jobConf) {
+    // reset current/latest property names
+    jobConf.setStrings(MRJobConfig.CACHE_FILES, "");
+    jobConf.setStrings(MRJobConfig.CACHE_FILES_SIZES, "");
+    jobConf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, "");
+    jobConf.setStrings(MRJobConfig.CACHE_FILE_VISIBILITIES, "");
+    // reset old property names
+    jobConf.setStrings("mapred.cache.files", "");
+    jobConf.setStrings("mapred.cache.files.filesizes", "");
+    jobConf.setStrings("mapred.cache.files.visibilities", "");
+    jobConf.setStrings("mapred.cache.files.timestamps", "");
+  }
+
+  /**
+   * Validate GenerateDistCacheData job if it creates dist cache files properly.
+   * @throws Exception
+   */
+  @Test
+  public void testGenerateDistCacheData() throws Exception {
+    long[] sortedFileSizes = new long[5];
+    JobConf jobConf =
+        runSetupGenerateDistCacheData(true, sortedFileSizes);
+    GridmixJob gridmixJob = new GenerateDistCacheData(jobConf);
+    Job job = gridmixJob.call();
+    assertEquals("Number of reduce tasks in GenerateDistCacheData is not 0.",
+        0, job.getNumReduceTasks());
+    assertTrue("GenerateDistCacheData job failed.",
+        job.waitForCompletion(false));
+    validateDistCacheData(jobConf, sortedFileSizes);
+  }
+
+  /**
+   *  Validate setupGenerateDistCacheData by validating
+   *  <li> permissions of the distributed cache directories and
+   *  <li> content of the generated sequence file. This includes validation of
+   *       dist cache file paths and their file sizes.
+   */
+  private void validateSetupGenDC(JobConf jobConf, long[] sortedFileSizes)
+      throws IOException, InterruptedException {
+    // build things needed for validation
+    long sumOfFileSizes = 0;
+    for (int i = 0; i < sortedFileSizes.length; i++) {
+      sumOfFileSizes += sortedFileSizes[i];
+    }
+
+    FileSystem fs = FileSystem.get(jobConf);
+    assertEquals("Number of distributed cache files to be generated is wrong.",
+        sortedFileSizes.length,
+        jobConf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1));
+    assertEquals("Total size of dist cache files to be generated is wrong.",
+        sumOfFileSizes, jobConf.getLong(
+        GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1));
+    Path filesListFile = new Path(jobConf.get(
+        GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST));
+    FileStatus stat = fs.getFileStatus(filesListFile);
+    assertEquals("Wrong permissions of dist Cache files list file "
+        + filesListFile, new FsPermission((short)0644), stat.getPermission());
+
+    InputSplit split =
+        new FileSplit(filesListFile, 0, stat.getLen(), (String[])null);
+    TaskAttemptContext taskContext =
+        MapReduceTestUtil.createDummyMapTaskAttemptContext(jobConf);
+    RecordReader<LongWritable, BytesWritable> reader =
+      new GenerateDistCacheData.GenDCDataFormat().createRecordReader(
+      split, taskContext);
+    MapContext<LongWritable, BytesWritable, NullWritable, BytesWritable>
+        mapContext = new MapContextImpl<LongWritable, BytesWritable,
+        NullWritable, BytesWritable>(jobConf, taskContext.getTaskAttemptID(),
+        reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+    reader.initialize(split, mapContext);
+
+    // start validating setupGenerateDistCacheData
+    doValidateSetupGenDC(reader, fs, sortedFileSizes);
+  }
+
+  /**
+   *  Validate setupGenerateDistCacheData by validating
+   *  <li> permissions of the distributed cache directory and
+   *  <li> content of the generated sequence file. This includes validation of
+   *       dist cache file paths and their file sizes.
+   */
+  private void doValidateSetupGenDC(RecordReader<LongWritable, BytesWritable>
+      reader, FileSystem fs, long[] sortedFileSizes)
+      throws IOException, InterruptedException {
+
+    // Validate permissions of dist cache directory
+    Path distCacheDir = dce.getDistributedCacheDir();
+    assertEquals("Wrong permissions for distributed cache dir " + distCacheDir,
+        fs.getFileStatus(distCacheDir).getPermission()
+        .getOtherAction().and(FsAction.EXECUTE), FsAction.EXECUTE);
+
+    // Validate the content of the sequence file generated by
+    // dce.setupGenerateDistCacheData().
+    LongWritable key = new LongWritable();
+    BytesWritable val = new BytesWritable();
+    for (int i = 0; i < sortedFileSizes.length; i++) {
+      assertTrue("Number of files written to the sequence file by "
+          + "setupGenerateDistCacheData is less than the expected.",
+          reader.nextKeyValue());
+      key = reader.getCurrentKey();
+      val = reader.getCurrentValue();
+      long fileSize = key.get();
+      String file = new String(val.getBytes(), 0, val.getLength());
+
+      // Dist Cache files should be sorted based on file size.
+      assertEquals("Dist cache file size is wrong.",
+          sortedFileSizes[i], fileSize);
+
+      // Validate dist cache file path.
+
+      // parent dir of dist cache file
+      Path parent = new Path(file).getParent().makeQualified(fs);
+      // should exist in dist cache dir
+      assertTrue("Public dist cache file path is wrong.",
+          distCacheDir.equals(parent));
+    }
+  }
+
+  /**
+   *  Test if DistributedCacheEmulator's setup of GenerateDistCacheData is
+   *  working as expected.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testSetupGenerateDistCacheData()
+      throws IOException, InterruptedException {
+    long[] sortedFileSizes = new long[5];
+    JobConf jobConf = runSetupGenerateDistCacheData(true, sortedFileSizes);
+    validateSetupGenDC(jobConf, sortedFileSizes);
+
+    // Verify if correct exit code is seen when -generate option is missing and
+    // distributed cache files are missing in the expected path.
+    runSetupGenerateDistCacheData(false, sortedFileSizes);
+  }
+
+  /**
+   *  Create DistributedCacheEmulator object and do the initialization by
+   *  calling init() on it with dummy trace. Also configure the pseudo local FS.
+   */
+  private DistributedCacheEmulator createDistributedCacheEmulator(
+      Configuration conf, Path ioPath, boolean generate) throws IOException {
+    DistributedCacheEmulator dce =
+        new DistributedCacheEmulator(conf, ioPath);
+    JobCreator jobCreator = JobCreator.getPolicy(conf, JobCreator.LOADJOB);
+    jobCreator.setDistCacheEmulator(dce);
+    dce.init("dummytrace", jobCreator, generate);
+    return dce;
+  }
+
+  /**
+   *  Test the configuration property for disabling/enabling emulation of
+   *  distributed cache load.
+   */
+  @Test
+  public void testDistCacheEmulationConfigurability() throws IOException {
+    Configuration conf = new Configuration();
+    JobConf jobConf = GridmixTestUtils.mrCluster.createJobConf(
+        new JobConf(conf));
+    Path ioPath = new Path("testDistCacheEmulationConfigurability")
+        .makeQualified(GridmixTestUtils.dfs);
+    FileSystem fs = FileSystem.get(jobConf);
+    FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777));
+
+    // default config
+    dce = createDistributedCacheEmulator(jobConf, ioPath, false);
+    assertTrue("Default configuration of "
+        + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+        + " is wrong.", dce.shouldEmulateDistCacheLoad());
+
+    // config property set to false
+    jobConf.setBoolean(
+        DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE, false);
+    dce = createDistributedCacheEmulator(jobConf, ioPath, false);
+    assertFalse("Disabling of emulation of distributed cache load by setting "
+        + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+        + " to false is not working.", dce.shouldEmulateDistCacheLoad());
+  }
+
+  /**
+   * Verify if DistributedCacheEmulator can configure distributed cache files
+   * for simulated job if job conf from trace had no dist cache files.
+   * @param conf configuration for the simulated job to be run
+   * @param jobConf job configuration of original cluster's job, obtained from
+   *                trace
+   * @throws IOException
+   */
+  private void validateJobConfWithOutDCFiles(Configuration conf,
+      JobConf jobConf) throws IOException {
+    // Validate if Gridmix can configure dist cache files properly if there are
+    // no HDFS-based dist cache files and localFS-based dist cache files in
+    // trace for a job.
+    dce.configureDistCacheFiles(conf, jobConf);
+    assertNull("Distributed cache files configured by GridMix is wrong.",
+               conf.get(MRJobConfig.CACHE_FILES));
+    assertNull("Distributed cache files configured by Gridmix through -files "
+               + "option is wrong.", conf.get("tmpfiles"));
+  }
+
+  /**
+   * Verify if DistributedCacheEmulator can configure distributed cache files
+   * for simulated job if job conf from trace had HDFS-based dist cache files
+   * and local-FS-based dist cache files.
+   * <br>Also validate if Gridmix can handle/read deprecated config properties
+   * like mapred.cache.files.filesizes and mapred.cache.files.visibilities from
+   * trace file.
+   * @param conf configuration for the simulated job to be run
+   * @param jobConf job configuration of original cluster's job, obtained from
+   *                trace
+   * @throws IOException
+   */
+  private void validateJobConfWithDCFiles(Configuration conf,
+      JobConf jobConf) throws IOException {
+    long[] sortedFileSizes = configureDummyDistCacheFiles(jobConf, true);
+
+    // Validate if Gridmix can handle deprecated config properties like
+    // mapred.cache.files.filesizes and mapred.cache.files.visibilities.
+    // 1 local FS based dist cache file and 5 HDFS based dist cache files. So
+    // total expected dist cache files count is 6.
+    assertEquals("Gridmix is not able to extract dist cache file sizes.",
+                 6, jobConf.getStrings(MRJobConfig.CACHE_FILES_SIZES).length);
+    assertEquals("Gridmix is not able to extract dist cache file visibilities.",
+                 6, jobConf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES).length);
+
+    dce.configureDistCacheFiles(conf, jobConf);
+
+    assertEquals("Configuring of HDFS-based dist cache files by gridmix is "
+                 + "wrong.", sortedFileSizes.length,
+                 conf.getStrings(MRJobConfig.CACHE_FILES).length);
+    assertEquals("Configuring of local-FS-based dist cache files by gridmix is "
+                 + "wrong.", 1, conf.getStrings("tmpfiles").length);
+  }
+
+  /**
+   * Verify if configureDistCacheFiles() works fine when there are distributed
+   * cache files set but visibilities are not set. This is to handle history
+   * traces of older hadoop version where there are no private/public
+   * Distributed Caches.
+   * @throws IOException
+   */
+  private void validateWithOutVisibilities() throws IOException {
+    Configuration conf = new Configuration();// configuration for simulated job
+    JobConf jobConf = new JobConf();
+    String user = "user1";
+    jobConf.setUser(user);
+    String[] files = {"/tmp/hdfs1.txt", "/tmp/"+ user + "/.staging/file1"};
+    jobConf.setStrings(MRJobConfig.CACHE_FILES, files);
+    jobConf.setStrings(MRJobConfig.CACHE_FILES_SIZES, "12,200");
+    jobConf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, "56789,98345");
+    dce.configureDistCacheFiles(conf, jobConf);
+    assertEquals("Configuring of HDFS-based dist cache files by gridmix is "
+                 + "wrong.", files.length,
+                 conf.getStrings(MRJobConfig.CACHE_FILES).length);
+    assertNull("Configuring of local-FS-based dist cache files by gridmix is "
+               + "wrong.", conf.get("tmpfiles"));
+  }
+
+  /**
+   * Test if Gridmix can configure config properties related to Distributed
+   * Cache properly. Also verify if Gridmix can handle deprecated config
+   * properties related to Distributed Cache.
+   * @throws IOException
+   */
+  @Test
+  public void testDistCacheFilesConfiguration() throws IOException {
+    Configuration conf = new Configuration();
+    JobConf jobConf = GridmixTestUtils.mrCluster.createJobConf(
+                        new JobConf(conf));
+    Path ioPath = new Path("testDistCacheEmulationConfigurability")
+                    .makeQualified(GridmixTestUtils.dfs);
+    FileSystem fs = FileSystem.get(jobConf);
+    FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777));
+
+    // default config
+    dce = createDistributedCacheEmulator(jobConf, ioPath, false);
+    assertTrue("Default configuration of "
+               + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+               + " is wrong.", dce.shouldEmulateDistCacheLoad());
+
+    // Validate if DistributedCacheEmulator can handle a JobStory with out
+    // Distributed Cache files properly.
+    validateJobConfWithOutDCFiles(conf, jobConf);
+
+    // Validate if Gridmix can configure dist cache files properly if there are
+    // HDFS-based dist cache files and localFS-based dist cache files in trace
+    // for a job. Set old config properties and validate.
+    validateJobConfWithDCFiles(conf, jobConf);
+    
+    // Use new JobConf as JobStory conf and check if configureDistCacheFiles()
+    // doesn't throw NPE when there are dist cache files set but visibilities
+    // are not set.
+    validateWithOutVisibilities();
+  }
+}
diff --git a/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
index 65abe4b..17c6738 100644
--- a/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
+++ b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
@@ -111,7 +111,7 @@
         GridmixTestUtils.mrCluster.createJobConf());
       for (Job job : succeeded) {
         final String jobname = job.getJobName();
-        if ("GRIDMIX_GENDATA".equals(jobname)) {
+        if (GenerateData.JOB_NAME.equals(jobname)) {
           if (!job.getConfiguration().getBoolean(
             GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true)) {
             assertEquals(" Improper queue for " + job.getJobName(),
diff --git a/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestPseudoLocalFs.java b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestPseudoLocalFs.java
new file mode 100644
index 0000000..b9c2728
--- /dev/null
+++ b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestPseudoLocalFs.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import static org.junit.Assert.*;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+/**
+ * Test the basic functionality of PseudoLocalFs
+ */
+public class TestPseudoLocalFs {
+
+  /**
+   * Test if a file on PseudoLocalFs of a specific size can be opened and read.
+   * Validate the size of the data read.
+   * Test the read methods of {@link PseudoLocalFs.RandomInputStream}.
+   * @throws Exception
+   */
+  @Test
+  public void testPseudoLocalFsFileSize() throws Exception {
+    long fileSize = 10000;
+    Path path = PseudoLocalFs.generateFilePath("myPsedoFile", fileSize);
+    PseudoLocalFs pfs = new PseudoLocalFs();
+    pfs.create(path);
+
+    // Read 1 byte at a time and validate file size.
+    InputStream in = pfs.open(path, 0);
+    long totalSize = 0;
+
+    while (in.read() >= 0) {
+      ++totalSize;
+    }
+    in.close();
+    assertEquals("File size mismatch with read().", fileSize, totalSize);
+
+    // Read data from PseudoLocalFs-based file into buffer to
+    // validate read(byte[]) and file size.
+    in = pfs.open(path, 0);
+    totalSize = 0;
+    byte[] b = new byte[1024];
+    int bytesRead = in.read(b);
+    while (bytesRead >= 0) {
+      totalSize += bytesRead;
+      bytesRead = in.read(b);
+    }
+    assertEquals("File size mismatch with read(byte[]).", fileSize, totalSize);
+  }
+
+  /**
+   * Validate if file status is obtained for correctly formed file paths on
+   * PseudoLocalFs and also verify if appropriate exception is thrown for
+   * invalid file paths.
+   * @param pfs Pseudo Local File System
+   * @param path file path for which getFileStatus() is to be called
+   * @param shouldSucceed <code>true</code> if getFileStatus() should succeed
+   * @throws IOException
+   */
+  private void validateGetFileStatus(FileSystem pfs, Path path,
+      boolean shouldSucceed) throws IOException {
+    boolean expectedExceptionSeen = false;
+    FileStatus stat = null;
+    try {
+      stat = pfs.getFileStatus(path);
+    } catch(FileNotFoundException e) {
+      expectedExceptionSeen = true;
+    }
+    if (shouldSucceed) {
+      assertFalse("getFileStatus() has thrown Exception for valid file name "
+                  + path, expectedExceptionSeen);
+      assertNotNull("Missing file status for a valid file.", stat);
+
+      // validate fileSize
+      String[] parts = path.toUri().getPath().split("\\.");
+      long expectedFileSize = Long.valueOf(parts[parts.length - 1]);
+      assertEquals("Invalid file size.", expectedFileSize, stat.getLen());
+    } else {
+      assertTrue("getFileStatus() did not throw Exception for invalid file "
+                 + " name " + path, expectedExceptionSeen);
+    }
+  }
+
+  /**
+   * Validate if file creation succeeds for correctly formed file paths on
+   * PseudoLocalFs and also verify if appropriate exception is thrown for
+   * invalid file paths.
+   * @param pfs Pseudo Local File System
+   * @param path file path for which create() is to be called
+   * @param shouldSucceed <code>true</code> if create() should succeed
+   * @throws IOException
+   */
+  private void validateCreate(FileSystem pfs, Path path,
+      boolean shouldSucceed) throws IOException {
+    boolean expectedExceptionSeen = false;
+    try {
+      pfs.create(path);
+    } catch(IOException e) {
+      expectedExceptionSeen = true;
+    }
+    if (shouldSucceed) {
+      assertFalse("create() has thrown Exception for valid file name "
+                  + path, expectedExceptionSeen);
+    } else {
+      assertTrue("create() did not throw Exception for invalid file name "
+                 + path, expectedExceptionSeen);
+    }
+  }
+
+  /**
+   * Validate if opening of file succeeds for correctly formed file paths on
+   * PseudoLocalFs and also verify if appropriate exception is thrown for
+   * invalid file paths.
+   * @param pfs Pseudo Local File System
+   * @param path file path for which open() is to be called
+   * @param shouldSucceed <code>true</code> if open() should succeed
+   * @throws IOException
+   */
+  private void validateOpen(FileSystem pfs, Path path,
+      boolean shouldSucceed) throws IOException {
+    boolean expectedExceptionSeen = false;
+    try {
+      pfs.open(path);
+    } catch(IOException e) {
+      expectedExceptionSeen = true;
+    }
+    if (shouldSucceed) {
+      assertFalse("open() has thrown Exception for valid file name "
+                  + path, expectedExceptionSeen);
+    } else {
+      assertTrue("open() did not throw Exception for invalid file name "
+                 + path, expectedExceptionSeen);
+    }
+  }
+
+  /**
+   * Validate if exists() returns <code>true</code> for correctly formed file
+   * paths on PseudoLocalFs and returns <code>false</code> for improperly
+   * formed file paths.
+   * @param pfs Pseudo Local File System
+   * @param path file path for which exists() is to be called
+   * @param shouldSucceed expected return value of exists(&lt;path&gt;)
+   * @throws IOException
+   */
+  private void validateExists(FileSystem pfs, Path path,
+      boolean shouldSucceed) throws IOException {
+    boolean ret = pfs.exists(path);
+    if (shouldSucceed) {
+      assertTrue("exists() returned false for valid file name " + path, ret);
+    } else {
+      assertFalse("exists() returned true for invalid file name " + path, ret);
+    }
+  }
+
+  /**
+   *  Test Pseudo Local File System methods like getFileStatus(), create(),
+   *  open(), exists() for <li> valid file paths and <li> invalid file paths.
+   * @throws IOException
+   */
+  @Test
+  public void testPseudoLocalFsFileNames() throws IOException {
+    PseudoLocalFs pfs = new PseudoLocalFs();
+    Configuration conf = new Configuration();
+    conf.setClass("fs.pseudo.impl", PseudoLocalFs.class, FileSystem.class);
+
+    Path path = new Path("pseudo:///myPsedoFile.1234");
+    FileSystem testFs = path.getFileSystem(conf);
+    assertEquals("Failed to obtain a pseudo local file system object from path",
+                 pfs.getUri().getScheme(), testFs.getUri().getScheme());
+
+    // Validate PseudoLocalFS operations on URI of some other file system
+    path = new Path("file:///myPsedoFile.12345");
+    validateGetFileStatus(pfs, path, false);
+    validateCreate(pfs, path, false);
+    validateOpen(pfs, path, false);
+    validateExists(pfs, path, false);
+
+    path = new Path("pseudo:///myPsedoFile");//.<fileSize> missing
+    validateGetFileStatus(pfs, path, false);
+    validateCreate(pfs, path, false);
+    validateOpen(pfs, path, false);
+    validateExists(pfs, path, false);
+
+    // thing after final '.' is not a number
+    path = new Path("pseudo:///myPsedoFile.txt");
+    validateGetFileStatus(pfs, path, false);
+    validateCreate(pfs, path, false);
+    validateOpen(pfs, path, false);
+    validateExists(pfs, path, false);
+
+    // Generate valid file name(relative path) and validate operations on it
+    long fileSize = 231456;
+    path = PseudoLocalFs.generateFilePath("my.Psedo.File", fileSize);
+    // Validate the above generateFilePath()
+    assertEquals("generateFilePath() failed.", fileSize,
+                 pfs.validateFileNameFormat(path));
+
+    validateGetFileStatus(pfs, path, true);
+    validateCreate(pfs, path, true);
+    validateOpen(pfs, path, true);
+    validateExists(pfs, path, true);
+
+    // Validate operations on valid qualified path
+    path = new Path("myPsedoFile.1237");
+    path = path.makeQualified(pfs);
+    validateGetFileStatus(pfs, path, true);
+    validateCreate(pfs, path, true);
+    validateOpen(pfs, path, true);
+    validateExists(pfs, path, true);
+  }
+}
diff --git a/src/docs/src/documentation/content/xdocs/gridmix.xml b/src/docs/src/documentation/content/xdocs/gridmix.xml
index 187cc3b..2ce5b1e 100644
--- a/src/docs/src/documentation/content/xdocs/gridmix.xml
+++ b/src/docs/src/documentation/content/xdocs/gridmix.xml
@@ -78,17 +78,25 @@
 	<code>-Dgridmix.output.directory=foo</code> as given above should
 	be used <em>before</em> other GridMix parameters.
       </note>
-      <p>The <code>-generate</code> option is used to generate input data
-      for the synthetic jobs. It accepts size suffixes, e.g.
-      <code>100g</code> will generate 100 * 2<sup>30</sup> bytes.</p>
-      <p>The <code>-users</code> option is used to point to a users-list
-      file (see <a href="#usersqueues">Emulating Users and Queues</a>).</p>
-      <p>The <code>&lt;iopath&gt;</code> parameter is the destination
-      directory for generated output and/or the directory from which input
-      data will be read. Note that this can either be on the local file-system
+      <p>The <code>&lt;iopath&gt;</code> parameter is the working directory for
+      GridMix. Note that this can either be on the local file-system
       or on HDFS, but it is highly recommended that it be the same as that for
       the original job mix so that GridMix puts the same load on the local
       file-system and HDFS respectively.</p>
+      <p>The <code>-generate</code> option is used to generate input data and
+      Distributed Cache files for the synthetic jobs. It accepts standard units
+      of size suffixes, e.g. <code>100g</code> will generate
+      100 * 2<sup>30</sup> bytes as input data.
+      <code>&lt;iopath&gt;/input</code> is the destination directory for
+      generated input data and/or the directory from which input data will be
+      read. HDFS-based Distributed Cache files are generated under the
+      distributed cache directory <code>&lt;iopath&gt;/distributedCache</code>.
+      If some of the needed Distributed Cache files are already existing in the
+      distributed cache directory, then only the remaining non-existing
+      Distributed Cache files are generated when <code>-generate</code> option
+      is specified.</p>
+      <p>The <code>-users</code> option is used to point to a users-list
+      file (see <a href="#usersqueues">Emulating Users and Queues</a>).</p>
       <p>The <code>&lt;trace&gt;</code> parameter is a path to a job trace
       generated by Rumen. This trace can be compressed (it must be readable
       using one of the compression codecs supported by the cluster) or
@@ -508,6 +516,30 @@
       The groupnames will be ignored by Gridmix.
       </p>
     </section>
+
+  <section id="distributedcacheload">
+  <title>Emulation of Distributed Cache Load</title>
+    <p>Gridmix emulates Distributed Cache load by default for LOADJOB type of
+    jobs. This is done by precreating the needed Distributed Cache files for all
+    the simulated jobs as part of a separate MapReduce job.</p>
+    <p>Emulation of Distributed Cache load in gridmix simulated jobs can be
+    disabled by configuring the property
+    <code>gridmix.distributed-cache-emulation.enable</code> to
+    <code>false</code>.
+    But generation of Distributed Cache data by gridmix is driven by
+    <code>-generate</code> option and is independent of this configuration
+    property.</p>
+    <p>Both generation of Distributed Cache files and emulation of
+    Distributed Cache load are disabled if:</p>
+    <ul>
+    <li>input trace comes from the standard input-stream instead of file, or</li>
+    <li><code>&lt;iopath&gt;</code> specified is on local file-system, or</li>
+    <li>any of the ascendant directories of the distributed cache directory
+    i.e. <code>&lt;iopath&gt;/distributedCache</code> (including the distributed
+    cache directory) doesn't have execute permission for others.</li>
+    </ul>
+  </section>
+
     <section id="assumptions">
       <title>Simplifying Assumptions</title>
       <p>GridMix will be developed in stages, incorporating feedback and