CRUNCH-551: Make the use of Configuration objects consistent in CrunchInputSplit and CrunchRecordReader
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
index ee54483..5ec5015 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
@@ -66,8 +66,8 @@
@Override
public void setConf(Configuration conf) {
this.conf = new Configuration(conf);
- if (bundle != null && conf != null) {
- this.bundle.configure(conf);
+ if (bundle != null) {
+ this.bundle.configure(this.conf);
}
}
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
index 2842658..d4175a6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
@@ -44,12 +44,8 @@
if (crunchSplit.get() instanceof CombineFileSplit) {
combineFileSplit = (CombineFileSplit) crunchSplit.get();
}
- Configuration conf = crunchSplit.getConf();
- if (conf == null) {
- conf = context.getConfiguration();
- crunchSplit.setConf(conf);
- }
- this.context = new TaskAttemptContextImpl(conf, context.getTaskAttemptID());
+ crunchSplit.setConf(context.getConfiguration());
+ this.context = new TaskAttemptContextImpl(crunchSplit.getConf(), context.getTaskAttemptID());
initNextRecordReader();
}
@@ -71,10 +67,9 @@
}
idx++;
- Configuration conf = crunchSplit.getConf();
InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance(
crunchSplit.getInputFormatClass(),
- conf);
+ crunchSplit.getConf());
this.curReader = inputFormat.createRecordReader(getDelegateSplit(), context);
return true;
}
@@ -136,12 +131,8 @@
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
this.crunchSplit = (CrunchInputSplit) inputSplit;
- Configuration conf = crunchSplit.getConf();
- if (conf == null) {
- conf = context.getConfiguration();
- crunchSplit.setConf(conf);
- }
- this.context = new TaskAttemptContextImpl(conf, context.getTaskAttemptID());
+ crunchSplit.setConf(context.getConfiguration());
+ this.context = new TaskAttemptContextImpl(crunchSplit.getConf(), context.getTaskAttemptID());
if (crunchSplit.get() instanceof CombineFileSplit) {
combineFileSplit = (CombineFileSplit) crunchSplit.get();
}