CRUNCH-600: pass job credentials when building multiple outputs

Signed-off-by: Micah Whitacre <mkwhit@gmail.com>
diff --git a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
index 6af3f84..8cda55b 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
@@ -25,6 +25,7 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URLEncoder;
+import java.util.Map;
 
 import com.google.common.io.Files;
 import org.apache.commons.io.filefilter.SuffixFileFilter;
@@ -33,10 +34,20 @@
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.To;
+import org.apache.crunch.io.text.TextFileTarget;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -109,4 +120,45 @@
     String regex = ".*_\\d{4}-\\d{2}-\\d{2}_\\d{2}\\.\\d{2}\\.\\d{2}\\.\\d{3}_jobplan\\.dot";
     assertTrue("DOT file name '" + fileName + "' did not match regex '" + regex + "'.", fileName.matches(regex));
   }
+
+  @Test
+  public void testJobCredentials() throws IOException {
+    Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<String> lines = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
+
+    pipeline.write(lines, new SecretTextFileTarget(tmpDir.getFile("output").getAbsolutePath()));
+
+    PipelineResult pipelineResult = pipeline.done();
+    assertTrue(pipelineResult.succeeded());
+  }
+
+  private static class SecretTextFileTarget extends TextFileTarget {
+    public SecretTextFileTarget(String path) {
+      super(path);
+    }
+
+    @Override
+    public Target outputConf(String key, String value) {
+      return super.outputConf(key, value);
+    }
+
+    @Override
+    public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+      Converter converter = ptype.getConverter();
+      Class keyClass = converter.getKeyClass();
+      Class valueClass = converter.getValueClass();
+      FormatBundle fb = FormatBundle.forOutput(SecretTextOutputFormat.class);
+      configureForMapReduce(job, keyClass, valueClass, fb, outputPath, name);
+
+      job.getCredentials().addSecretKey(new Text("secret"), "myPassword".getBytes());
+    }
+  }
+  private static class SecretTextOutputFormat extends TextOutputFormat {
+    @Override
+    public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
+      byte[] secret = job.getCredentials().getSecretKey(new Text("secret"));
+      assertEquals("job credentials did not match", "myPassword", new String(secret));
+      return super.getRecordWriter(job);
+    }
+  }
 }
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index 653a401..a9621ba 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -21,6 +21,7 @@
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
@@ -203,7 +204,7 @@
 
   private static Job getJob(JobID jobID, String namedOutput, Configuration baseConf)
       throws IOException {
-    Job job = new Job(new Configuration(baseConf));
+    Job job = new Job(new JobConf(baseConf));
     job.getConfiguration().set("crunch.namedoutput", namedOutput);
     setJobID(job, jobID, namedOutput);
     return job;