CRUNCH-579: Supported access to counters from original TaskContext
Signed-off-by: Micah Whitacre <mkwhit@gmail.com>
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index a9621ba..2e8dc8d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
@@ -30,6 +31,7 @@
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
@@ -219,7 +221,7 @@
baseTaskId.isMap(),
baseTaskId.getTaskID().getId(),
baseTaskId.getId());
- return new TaskAttemptContextImpl(job.getConfiguration(), taskId);
+ return new TaskAttemptContextWrapper(baseContext, job.getConfiguration(), taskId);
}
private static void setJobID(Job job, JobID jobID, String namedOutput) {
@@ -361,4 +363,24 @@
}
}
}
+
+ private static class TaskAttemptContextWrapper extends TaskAttemptContextImpl {
+
+ private final TaskAttemptContext baseContext;
+
+ public TaskAttemptContextWrapper(TaskAttemptContext baseContext, Configuration config, TaskAttemptID taskId){
+ super(config, taskId);
+ this.baseContext = baseContext;
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> counterName) {
+ return baseContext.getCounter(counterName);
+ }
+
+ @Override
+ public Counter getCounter(String groupName, String counterName) {
+ return baseContext.getCounter(groupName, counterName);
+ }
+ }
}