TEZ-490. Rename SimpleInput / SimpleOutput to be MR specific (part of
TEZ-398). (sseth)
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index f32fa6b..1967462 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -85,8 +85,8 @@
import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.common.security.TokenCache;
import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -497,20 +497,20 @@
// FIXME need Input/Output vertices else we have this hack
if (taskSpec.getInputs().isEmpty()) {
- InputDescriptor simpleInputDesc =
- new InputDescriptor(SimpleInputLegacy.class.getName());
- simpleInputDesc.setUserPayload(
+ InputDescriptor mrInputDesc =
+ new InputDescriptor(MRInputLegacy.class.getName());
+ mrInputDesc.setUserPayload(
taskSpec.getProcessorDescriptor().getUserPayload());
taskSpec.getInputs().add(
- new InputSpec("null", simpleInputDesc, 0));
+ new InputSpec("null", mrInputDesc, 0));
}
if (taskSpec.getOutputs().isEmpty()) {
- OutputDescriptor simpleOutputDesc =
- new OutputDescriptor(SimpleOutput.class.getName());
- simpleOutputDesc.setUserPayload(
+ OutputDescriptor mrOutputDesc =
+ new OutputDescriptor(MROutput.class.getName());
+ mrOutputDesc.setUserPayload(
taskSpec.getProcessorDescriptor().getUserPayload());
taskSpec.getOutputs().add(
- new OutputSpec("null", simpleOutputDesc, 0));
+ new OutputSpec("null", mrOutputDesc, 0));
}
String [] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
index 0b86a8e..2c53e75 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
@@ -187,7 +187,7 @@
- // TODO NEWTEZ Move this into a common class. Also used in SImpleInput
+ // TODO NEWTEZ Move this into a common class. Also used in MRInput
private class SimpleValueIterator implements Iterator<V> {
private V value;
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
index f59e836..f2b0a38 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
@@ -83,8 +83,8 @@
//import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
//import org.apache.tez.mapreduce.hadoop.IDConverter;
//import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
-//import org.apache.tez.mapreduce.input.SimpleInput;
-//import org.apache.tez.mapreduce.output.SimpleOutput;
+//import org.apache.tez.mapreduce.input.MRInput;
+//import org.apache.tez.mapreduce.output.MROutput;
//import org.apache.tez.mapreduce.processor.map.MapProcessor;
//import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
//
@@ -252,7 +252,7 @@
// tezMapId, user, localConf.getJobName(), "TODO_vertexName",
// mapProcessorDesc,
// Collections.singletonList(new InputSpec("srcVertex", 0,
-// SimpleInput.class.getName())),
+// MRInput.class.getName())),
// Collections.singletonList(new OutputSpec("tgtVertex", 0,
// LocalOnFileSorterOutput.class.getName())));
//
@@ -458,7 +458,7 @@
// Collections.singletonList(new InputSpec("TODO_srcVertexName",
// mapIds.size(), LocalMergedInput.class.getName())),
// Collections.singletonList(new OutputSpec("TODO_targetVertex",
-// 0, SimpleOutput.class.getName())));
+// 0, MROutput.class.getName())));
//
// // move map output to reduce input
// for (int i = 0; i < mapIds.size(); i++) {
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
similarity index 97%
rename from tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
rename to tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 598f801..6066d93 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -57,16 +57,16 @@
import com.google.common.base.Preconditions;
/**
- * {@link SimpleInput} is an {@link Input} which provides key/values pairs
+ * {@link MRInput} is an {@link Input} which provides key/values pairs
* for the consumer.
*
* It is compatible with all standard Apache Hadoop MapReduce
* {@link InputFormat} implementations.
*/
-public class SimpleInput implements LogicalInput {
+public class MRInput implements LogicalInput {
- private static final Log LOG = LogFactory.getLog(SimpleInput.class);
+ private static final Log LOG = LogFactory.getLog(MRInput.class);
private TezInputContext inputContext;
@@ -108,7 +108,7 @@
this.splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
thisTaskMetaInfo.getStartOffset());
- // TODO NEWTEZ Rename this to be specific to SimpleInput. This Input, in
+ // TODO NEWTEZ Rename this to be specific to MRInput. This Input, in
// theory, can be used by the MapProcessor, ReduceProcessor or a custom
// processor. (The processor could provide the counter though)
this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
@@ -201,11 +201,11 @@
}
/**
- * SimpleInputs sets some additional parameters like split location when using
+ * {@link MRInput} sets some additional parameters like split location when using
* the new API. This methods returns the list of additional updates, and
- * should be used by Processors using the old MapReduce API with SimpleInput.
+ * should be used by Processors using the old MapReduce API with {@link MRInput}.
*
- * @return the additional fields set by SimpleInput
+ * @return the additional fields set by {@link MRInput}
*/
public Configuration getConfigUpdates() {
return new Configuration(incrementalConf);
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
similarity index 95%
rename from tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java
rename to tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index 4e61aa7..5923746 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -21,7 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.mapred.RecordReader;
-public class SimpleInputLegacy extends SimpleInput {
+public class MRInputLegacy extends MRInput {
@Private
public org.apache.hadoop.mapreduce.InputSplit getNewInputSplit() {
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
similarity index 97%
rename from tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
rename to tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index d82c9e2..e6bdbe6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -37,9 +37,9 @@
import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
-public class SimpleOutput implements LogicalOutput {
+public class MROutput implements LogicalOutput {
- private static final Log LOG = LogFactory.getLog(SimpleOutput.class);
+ private static final Log LOG = LogFactory.getLog(MROutput.class);
private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
static {
@@ -295,7 +295,7 @@
}
/**
- * SimpleOutput expects that a Processor call commit prior to the
+ * MROutput expects that a Processor call commit prior to the
* Processor's completion
* @throws IOException
*/
@@ -310,7 +310,7 @@
/**
- * SimpleOutput expects that a Processor call abort in case of any error
+ * MROutput expects that a Processor call abort in case of any error
* ( including an error during commit ) prior to the Processor's completion
* @throws IOException
*/
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index f7404d4..fac1454 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -81,7 +81,7 @@
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.output.MROutput;
@SuppressWarnings("deprecation")
public abstract class MRTask {
@@ -423,8 +423,8 @@
+ " And is in the process of committing");
// TODO change this to use the new context
// TODO TEZ Interaciton between Commit and OutputReady. Merge ?
- if (output instanceof SimpleOutput) {
- SimpleOutput sOut = (SimpleOutput)output;
+ if (output instanceof MROutput) {
+ MROutput sOut = (MROutput)output;
if (sOut.isCommitRequired()) {
//wait for commit approval and commit
// TODO EVENTUALLY - Commit is not required for map tasks.
@@ -458,7 +458,7 @@
statusUpdate();
}
- private void commit(SimpleOutput output) throws IOException {
+ private void commit(MROutput output) throws IOException {
int retries = 3;
while (true) {
// This will loop till the AM asks for the task to be killed. As
@@ -495,7 +495,7 @@
}
private
- void discardOutput(SimpleOutput output) {
+ void discardOutput(MROutput output) {
try {
output.abort();
} catch (IOException ioe) {
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
index 22312f7..85139ed 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
@@ -65,7 +65,7 @@
if (isProcessorContext) {
((TezProcessorContext)context).setProgress(progress);
} else {
- // TODO FIXME NEWTEZ - will simpleoutput's reporter use this api?
+ // TODO FIXME NEWTEZ - will MROutput's reporter use this api?
}
}
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 2084146..e4b990a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -44,9 +44,9 @@
import org.apache.tez.engine.api.TezProcessorContext;
import org.apache.tez.engine.lib.output.OnFileSortedOutput;
import org.apache.tez.mapreduce.hadoop.mapreduce.MapContextImpl;
-import org.apache.tez.mapreduce.input.SimpleInput;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
@@ -99,15 +99,15 @@
LogicalOutput out = outputs.values().iterator().next();
// Sanity check
- if (!(in instanceof SimpleInputLegacy)) {
+ if (!(in instanceof MRInputLegacy)) {
throw new IOException(new TezException(
"Only Simple Input supported. Input: " + in.getClass()));
}
- SimpleInputLegacy input = (SimpleInputLegacy)in;
+ MRInputLegacy input = (MRInputLegacy)in;
KVWriter kvWriter = null;
if (!(out instanceof OnFileSortedOutput)) {
- kvWriter = ((SimpleOutput)out).getWriter();
+ kvWriter = ((MROutput)out).getWriter();
} else {
kvWriter = ((OnFileSortedOutput)out).getWriter();
}
@@ -124,13 +124,13 @@
void runOldMapper(
final JobConf job,
final MRTaskReporter reporter,
- final SimpleInputLegacy input,
+ final MRInputLegacy input,
final KVWriter output
) throws IOException, InterruptedException {
// Initialize input in-line since it sets parameters which may be used by the processor.
- // Done only for SimpleInput.
- // TODO use new method in SimpleInput to get required info
+ // Done only for MRInput.
+ // TODO use new method in MRInput to get required info
//input.initialize(job, master);
RecordReader in = new OldRecordReader(input);
@@ -147,13 +147,13 @@
private void runNewMapper(final JobConf job,
MRTaskReporter reporter,
- final SimpleInputLegacy in,
+ final MRInputLegacy in,
KVWriter out
) throws IOException, InterruptedException {
// Initialize input in-line since it sets parameters which may be used by the processor.
- // Done only for SimpleInput.
- // TODO use new method in SimpleInput to get required info
+ // Done only for MRInput.
+ // TODO use new method in MRInput to get required info
//in.initialize(job, master);
// make a task context so we can get the classes
@@ -197,10 +197,10 @@
private static class NewRecordReader extends
org.apache.hadoop.mapreduce.RecordReader {
- private final SimpleInput in;
+ private final MRInput in;
private KVReader reader;
- private NewRecordReader(SimpleInput in) throws IOException {
+ private NewRecordReader(MRInput in) throws IOException {
this.in = in;
this.reader = in.getReader();
}
@@ -241,38 +241,38 @@
}
private static class OldRecordReader implements RecordReader {
- private final SimpleInputLegacy simpleInput;
+ private final MRInputLegacy mrInput;
- private OldRecordReader(SimpleInputLegacy simpleInput) {
- this.simpleInput = simpleInput;
+ private OldRecordReader(MRInputLegacy mrInput) {
+ this.mrInput = mrInput;
}
@Override
public boolean next(Object key, Object value) throws IOException {
// TODO broken
-// simpleInput.setKey(key);
-// simpleInput.setValue(value);
+// mrInput.setKey(key);
+// mrInput.setValue(value);
// try {
-// return simpleInput.hasNext();
+// return mrInput.hasNext();
// } catch (InterruptedException ie) {
// throw new IOException(ie);
// }
- return simpleInput.getOldRecordReader().next(key, value);
+ return mrInput.getOldRecordReader().next(key, value);
}
@Override
public Object createKey() {
- return simpleInput.getOldRecordReader().createKey();
+ return mrInput.getOldRecordReader().createKey();
}
@Override
public Object createValue() {
- return simpleInput.getOldRecordReader().createValue();
+ return mrInput.getOldRecordReader().createValue();
}
@Override
public long getPos() throws IOException {
- return simpleInput.getOldRecordReader().getPos();
+ return mrInput.getOldRecordReader().getPos();
}
@Override
@@ -282,7 +282,7 @@
@Override
public float getProgress() throws IOException {
try {
- return simpleInput.getProgress();
+ return mrInput.getProgress();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 9274765..19acb39 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -48,7 +48,7 @@
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy;
import org.apache.tez.engine.lib.output.OnFileSortedOutput;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
@@ -133,8 +133,8 @@
KVReader kvReader = shuffleInput.getReader();
KVWriter kvWriter = null;
- if((out instanceof SimpleOutput)) {
- kvWriter = ((SimpleOutput) out).getWriter();
+ if((out instanceof MROutput)) {
+ kvWriter = ((MROutput) out).getWriter();
} else if ((out instanceof OnFileSortedOutput)) {
kvWriter = ((OnFileSortedOutput) out).getWriter();
} else {
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 06e2f4b..89292ab 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -48,7 +48,7 @@
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.mapreduce.processor.MapUtils;
import org.junit.After;
@@ -120,7 +120,7 @@
MapUtils.generateInputSplit(localFs, workDir, job, mapInput);
- InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 0);
+ InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(MRInputLegacy.class.getName()), 0);
OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, job, 0,
@@ -191,7 +191,7 @@
// localFs, workDir, job, 0, new Path(workDir, "map0"),
// new TestUmbilicalProtocol(true), vertexName,
// Collections.singletonList(new InputSpec("NullVertex", 0,
-// SimpleInput.class.getName())),
+// MRInput.class.getName())),
// Collections.singletonList(new OutputSpec("FakeVertex", 1,
// OldInMemorySortedOutput.class.getName()))
// );
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index a3abd76..274c353 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -54,8 +54,8 @@
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.mapreduce.processor.MapUtils;
import org.junit.After;
@@ -125,7 +125,7 @@
Path mapInput = new Path(workDir, "map0");
MapUtils.generateInputSplit(localFs, workDir, mapConf, mapInput);
- InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 0);
+ InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(MRInputLegacy.class.getName()), 0);
OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
// Run a map
LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, mapConf, 0,
@@ -152,7 +152,7 @@
ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(reduceConf));
InputSpec reduceInputSpec = new InputSpec(mapVertexName, new InputDescriptor(LocalMergedInput.class.getName()), 1);
- OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex", new OutputDescriptor(SimpleOutput.class.getName()), 1);
+ OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex", new OutputDescriptor(MROutput.class.getName()), 1);
// Now run a reduce
TaskSpec taskSpec = new TaskSpec(