removed the dependency on abstracthdfsinputoperator
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
index 4eb01e3..aade688 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
@@ -19,46 +19,27 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import javax.validation.constraints.Min;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.KeyValueTextInputFormat;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import com.datatorrent.lib.io.fs.AbstractHDFSInputOperator;
-import com.datatorrent.lib.util.KeyHashValPair;
+import org.apache.hadoop.mapred.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Partitioner;
 
 import com.datatorrent.demos.mroperator.ReporterImpl.ReporterType;
+import com.datatorrent.lib.util.KeyHashValPair;
 
 /**
  * <p>
@@ -68,13 +49,11 @@
  * @since 0.9.0
  */
 @SuppressWarnings({ "unchecked"})
-public class MapOperator<K1, V1, K2, V2> extends AbstractHDFSInputOperator implements Partitioner<MapOperator<K1, V1, K2, V2>>
+public class MapOperator<K1, V1, K2, V2>  implements InputOperator, Partitioner<MapOperator<K1, V1, K2, V2>>
 {
 
   private static final Logger logger = LoggerFactory.getLogger(MapOperator.class);
-
   private String dirName;
-
   private boolean emitPartitioningCountOnce = false;
   private boolean emitLastCountOnce = false;
   private int operatorId;
@@ -119,7 +98,6 @@
   public void setDirName(String dirName)
   {
     this.dirName = dirName;
-    super.setFilePath(dirName);
   }
 
   public int getPartitionCount()
@@ -147,7 +125,12 @@
         logger.info("error getting record reader {}", e.getMessage());
       }
     }
-    super.beginWindow(windowId);
+  }
+
+  @Override
+  public void teardown()
+  {
+
   }
 
   @Override
@@ -205,17 +188,7 @@
   }
 
   @Override
-  public void activate(OperatorContext context)
-  {
-  }
-
-  @Override
-  public void deactivate()
-  {
-  }
-
-  @Override
-  public void emitTuples(FSDataInputStream stream)
+  public void emitTuples()
   {
     if (!emittedAll) {
       try {