MAPREDUCE-2351. mapred.job.tracker.history.completed.location should support an arbitrary filesystem URI.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1076804 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 32fa228..eec3971 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -51,6 +51,9 @@
 
     MAPREDUCE-2302. Add static factory methods in GaloisField. (schen)
 
+    MAPREDUCE-2351. mapred.job.tracker.history.completed.location should
+    support an arbitrary filesystem URI. (tomwhite)
+
   OPTIMIZATIONS
     
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and
diff --git a/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java b/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
index 0122e6e..1d1fb90 100644
--- a/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
+++ b/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
@@ -152,8 +152,9 @@
     String doneLocation =
       conf.get(JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION);
     if (doneLocation != null) {
-      done = fs.makeQualified(new Path(doneLocation));
-      doneDirFs = fs;
+      Path donePath = new Path(doneLocation);
+      doneDirFs = donePath.getFileSystem(conf);
+      done = doneDirFs.makeQualified(donePath);
     } else {
       done = logDirFs.makeQualified(new Path(logDir, "done"));
       doneDirFs = logDirFs;
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java b/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
index 22420a1..a8b0708 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
@@ -606,9 +606,18 @@
     // Validate the job queue name
     assertTrue(jobInfo.getJobQueueName().equals(conf.getQueueName()));
   }
-
+  
   public void testDoneFolderOnHDFS() throws IOException, InterruptedException {
+    runDoneFolderTest("history_done");
+  }
+    
+  public void testDoneFolderNotOnDefaultFileSystem() throws IOException, InterruptedException {
+    runDoneFolderTest("file://" + System.getProperty("test.build.data", "tmp") + "/history_done");
+  }
+    
+  private void runDoneFolderTest(String doneFolder) throws IOException, InterruptedException {
     MiniMRCluster mr = null;
+    MiniDFSCluster dfsCluster = null;
     try {
       JobConf conf = new JobConf();
       // keep for less time
@@ -616,7 +625,6 @@
       conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
 
       //set the done folder location
-      String doneFolder = "history_done";
       conf.set(JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION, doneFolder);
 
       String logDir =
@@ -639,7 +647,7 @@
       assertEquals("No of file in logDir not correct", 2,
           logDirFs.listStatus(logDirPath).length);
       
-      MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, 2, true, null);
+      dfsCluster = new MiniDFSCluster(conf, 2, true, null);
       mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
           3, null, null, conf);
 
@@ -674,7 +682,7 @@
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
       
       assertEquals("History DONE folder not correct", 
-          doneFolder, doneDir.getName());
+          new Path(doneFolder).getName(), doneDir.getName());
       JobID id = job.getID();
       String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
 
@@ -724,6 +732,9 @@
         cleanupLocalFiles(mr);
         mr.shutdown();
       }
+      if (dfsCluster != null) {
+        dfsCluster.shutdown();
+      }
     }
   }