removed the dependency on abstracthdfsinputoperator
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
index 4eb01e3..aade688 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
@@ -19,46 +19,27 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import javax.validation.constraints.Min;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.KeyValueTextInputFormat;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import com.datatorrent.lib.io.fs.AbstractHDFSInputOperator;
-import com.datatorrent.lib.util.KeyHashValPair;
+import org.apache.hadoop.mapred.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.demos.mroperator.ReporterImpl.ReporterType;
+import com.datatorrent.lib.util.KeyHashValPair;
/**
* <p>
@@ -68,13 +49,11 @@
* @since 0.9.0
*/
@SuppressWarnings({ "unchecked"})
-public class MapOperator<K1, V1, K2, V2> extends AbstractHDFSInputOperator implements Partitioner<MapOperator<K1, V1, K2, V2>>
+public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<MapOperator<K1, V1, K2, V2>>
{
private static final Logger logger = LoggerFactory.getLogger(MapOperator.class);
-
private String dirName;
-
private boolean emitPartitioningCountOnce = false;
private boolean emitLastCountOnce = false;
private int operatorId;
@@ -119,7 +98,6 @@
public void setDirName(String dirName)
{
this.dirName = dirName;
- super.setFilePath(dirName);
}
public int getPartitionCount()
@@ -147,7 +125,12 @@
logger.info("error getting record reader {}", e.getMessage());
}
}
- super.beginWindow(windowId);
+ }
+
+ @Override
+ public void teardown()
+ {
+
}
@Override
@@ -205,17 +188,7 @@
}
@Override
- public void activate(OperatorContext context)
- {
- }
-
- @Override
- public void deactivate()
- {
- }
-
- @Override
- public void emitTuples(FSDataInputStream stream)
+ public void emitTuples()
{
if (!emittedAll) {
try {