CRUNCH-600: pass job credentials when building multiple outputs
Signed-off-by: Micah Whitacre <mkwhit@gmail.com>
diff --git a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
index 6af3f84..8cda55b 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.net.URLEncoder;
+import java.util.Map;
import com.google.common.io.Files;
import org.apache.commons.io.filefilter.SuffixFileFilter;
@@ -33,10 +34,20 @@
import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.To;
+import org.apache.crunch.io.text.TextFileTarget;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.junit.Rule;
import org.junit.Test;
@@ -109,4 +120,45 @@
String regex = ".*_\\d{4}-\\d{2}-\\d{2}_\\d{2}\\.\\d{2}\\.\\d{2}\\.\\d{3}_jobplan\\.dot";
assertTrue("DOT file name '" + fileName + "' did not match regex '" + regex + "'.", fileName.matches(regex));
}
+
+ @Test
+ public void testJobCredentials() throws IOException {
+ Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<String> lines = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
+
+ pipeline.write(lines, new SecretTextFileTarget(tmpDir.getFile("output").getAbsolutePath()));
+
+ PipelineResult pipelineResult = pipeline.done();
+ assertTrue(pipelineResult.succeeded());
+ }
+
+ private static class SecretTextFileTarget extends TextFileTarget {
+ public SecretTextFileTarget(String path) {
+ super(path);
+ }
+
+ @Override
+ public Target outputConf(String key, String value) {
+ return super.outputConf(key, value);
+ }
+
+ @Override
+ public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+ Converter converter = ptype.getConverter();
+ Class keyClass = converter.getKeyClass();
+ Class valueClass = converter.getValueClass();
+ FormatBundle fb = FormatBundle.forOutput(SecretTextOutputFormat.class);
+ configureForMapReduce(job, keyClass, valueClass, fb, outputPath, name);
+
+ job.getCredentials().addSecretKey(new Text("secret"), "myPassword".getBytes());
+ }
+ }
+ private static class SecretTextOutputFormat extends TextOutputFormat {
+ @Override
+ public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
+ byte[] secret = job.getCredentials().getSecretKey(new Text("secret"));
+ assertEquals("job credentials did not match", "myPassword", new String(secret));
+ return super.getRecordWriter(job);
+ }
+ }
}
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index 653a401..a9621ba 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -21,6 +21,7 @@
import org.apache.crunch.CrunchRuntimeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
@@ -203,7 +204,7 @@
private static Job getJob(JobID jobID, String namedOutput, Configuration baseConf)
throws IOException {
- Job job = new Job(new Configuration(baseConf));
+ Job job = new Job(new JobConf(baseConf));
job.getConfiguration().set("crunch.namedoutput", namedOutput);
setJobID(job, jobID, namedOutput);
return job;