Testing
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
index 6e0863a..522de93 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
@@ -59,7 +59,7 @@
  *
  * @since 1.0.2
  */
-public abstract class AbstractFSDirectoryInputOperator<T> implements InputOperator, Partitioner<AbstractFSDirectoryInputOperator<T>>, StatsListener, CountersAggregator
+public abstract class AbstractFSDirectoryInputOperator<T> implements InputOperator, Partitioner<AbstractFSDirectoryInputOperator<T>>, StatsListener
 {
   private static final Logger LOG = LoggerFactory.getLogger(AbstractFSDirectoryInputOperator.class);
 
@@ -78,8 +78,13 @@
   private int maxRetryCount = 5;
   transient protected int skipCount = 0;
   private transient OperatorContext context;
-  private long numberOfFailures = 0;
-  private long numberOfRetries = 0;
+  
+  protected long globalNumberOfFailures = 0;
+  protected long localNumberOfFailures = 0;
+  protected long globalNumberOfRetries = 0;
+  protected long localNumberOfRetries = 0;
+  private transient int globalProcessedFileCount = 0;
+  private transient int localProcessedFileCount = 0;
 
   /**
    * Class representing failed file, When read fails on a file in middle, then the file is
@@ -122,78 +127,66 @@
     }
   }
   
-  public static class FileCounters implements Serializable
+  public final static class FileCounters implements Serializable
   {
-    int processedFiles;
-    long pendingFiles;
-    long numberOfFailures;
-    long numberOfRetries;
+    public int globalProcessedFiles;
+    public int localProcessedFiles;
+    public long globalNumberOfFailures;
+    public long localNumberOfFailures;
+    public long globalNumberOfRetries;
+    public long localNumberOfRetries;
+    public long pendingFiles;
     
     public FileCounters()
     {
-      this.processedFiles = 0;
+      this.globalProcessedFiles = 0;
+      this.localProcessedFiles = 0;
+      this.globalNumberOfFailures = 0;
+      this.localNumberOfFailures = 0;
+      this.globalNumberOfRetries = 0;
+      this.localNumberOfRetries = 0;
       this.pendingFiles = 0;
-      this.numberOfFailures = 0;
-      this.numberOfRetries = 0;
     }
     
-    public FileCounters(int processedFiles,
-                        long pendingFiles,
-                        long numberOfFailures,
-                        long numberOfRetries)
+    public FileCounters(int globalProcessedFiles,
+                        int localProcessedFiles,
+                        long globalNumberOfFailures,
+                        long localNumberOfFailures,
+                        long globalNumberOfRetries,
+                        long localNumberOfRetries,
+                        long pendingFiles)
     {
-      setProcessedFiles(processedFiles);
-      setPendingFiles(pendingFiles);
-      setNumberOfFailures(numberOfFailures);
-      setNumberOfRetries(numberOfRetries);
+      this.globalProcessedFiles = globalProcessedFiles;
+      this.localProcessedFiles = localProcessedFiles;
+      this.globalNumberOfFailures = globalNumberOfFailures;
+      this.localNumberOfFailures = localNumberOfFailures;
+      this.globalNumberOfRetries = globalNumberOfRetries;
+      this.localNumberOfRetries = localNumberOfRetries;
+      this.pendingFiles = pendingFiles;
     }
     
     public void addL(FileCounters fileCounters)
     {
-      this.processedFiles += fileCounters.getProcessedFiles();
-      this.pendingFiles += fileCounters.getPendingFiles();
-      this.numberOfFailures += fileCounters.getNumberOfFailures();
-      this.numberOfRetries += fileCounters.getNumberOfRetries();
+      this.localProcessedFiles += fileCounters.localProcessedFiles;
+      this.localNumberOfFailures += fileCounters.localNumberOfFailures;
+      this.localNumberOfRetries += fileCounters.localNumberOfRetries;
+      this.pendingFiles += fileCounters.pendingFiles;
     }
-    
-    private void setProcessedFiles(int processedFiles)
+  }
+  
+  public static class FileCountersAggregator implements CountersAggregator,
+                                                        Serializable
+  {
+    @Override
+    public Object aggregate(Collection<?> countersList)
     {
-      this.processedFiles = processedFiles;
-    }
+      FileCounters totalFileCounters = new FileCounters();
     
-    public int getProcessedFiles()
-    {
-      return processedFiles;
-    }
+      for(Object fileCounters: countersList) {
+        totalFileCounters.addL((FileCounters) fileCounters);
+      }
     
-    private void setPendingFiles(long pendingFiles)
-    {
-      this.pendingFiles = pendingFiles;
-    }
-    
-    public long getPendingFiles()
-    {
-      return pendingFiles;
-    }
-    
-    private void setNumberOfFailures(long numberOfFailures)
-    {
-      this.numberOfFailures = numberOfFailures;
-    }
-    
-    public long getNumberOfFailures()
-    {
-      return numberOfFailures;
-    }
-    
-    private void setNumberOfRetries(long numberOfRetries)
-    {
-      this.numberOfRetries = numberOfRetries;
-    }
-    
-    public long getNumberOfRetries()
-    {
-      return numberOfRetries;
+      return totalFileCounters;
     }
   }
   
@@ -280,6 +273,7 @@
   @Override
   public void setup(OperatorContext context)
   {
+    globalProcessedFileCount = processedFiles.size();
     this.context = context;
     
     try {
@@ -304,12 +298,7 @@
       LOG.info("Read offset={} records in setup time={}", offset, System.currentTimeMillis() - startTime);
     }
     catch (IOException ex) {
-      numberOfFailures++;
-      if(maxRetryCount <= 0) {
-        throw new RuntimeException(ex);
-      }
-      LOG.error("FS reader error", ex);
-      addToFailedList();
+      failureHandling(ex);
     }
   }
 
@@ -330,7 +319,6 @@
   public void endWindow()
   {
     if(context != null) {
-      int processedFileCount = processedFiles.size();
       long pendingFileCount = ((long) pendingFiles.size()) + 
                               ((long) failedFiles.size()) + 
                               ((long) unfinishedFiles.size());
@@ -369,24 +357,10 @@
             retryFailedFile(failedFiles.poll());
           }
           else {
-            if (System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis) {
-              Set<Path> newPaths = scanner.scan(fs, filePath, processedFiles);
-      
-              for(Path newPath: newPaths) {
-                String newPathString = newPath.toString();
-                pendingFiles.add(newPathString);
-                processedFiles.add(newPathString);
-              }
-              lastScanMillis = System.currentTimeMillis();
-            }
+            scanDirectory();
           }
         } catch (IOException ex) {
-          numberOfFailures++;
-          if(maxRetryCount <= 0) {
-            throw new RuntimeException(ex);
-          }
-          LOG.error("FS reader error", ex);
-          addToFailedList();
+          failureHandling(ex);
         }
       }
 
@@ -413,12 +387,7 @@
               skipCount--;
           }
         } catch (IOException e) {
-          numberOfFailures++;
-          if(maxRetryCount <= 0) {
-            throw new RuntimeException(e);
-          }
-          LOG.error("FS reader error", e);
-          addToFailedList();
+          failureHandling(e);
         }
       }
       //If the operator is idempotent, do nothing on other calls to emittuples
@@ -429,6 +398,32 @@
       }
     }
   }
+  
+  protected void scanDirectory()
+  {
+    if(System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis) {
+      Set<Path> newPaths = scanner.scan(fs, filePath, processedFiles);
+
+      for(Path newPath : newPaths) {
+        String newPathString = newPath.toString();
+        pendingFiles.add(newPathString);
+        processedFiles.add(newPathString);
+        localProcessedFileCount++;
+      }
+      
+      lastScanMillis = System.currentTimeMillis();
+    }
+  }
+  
+  private void failureHandling(Exception e)
+  {
+    localNumberOfFailures++;
+    if(maxRetryCount <= 0) {
+      throw new RuntimeException(e);
+    }
+    LOG.error("FS reader error", e);
+    addToFailedList();
+  }
 
   protected void addToFailedList() {
 
@@ -439,7 +434,7 @@
       if (this.inputStream != null)
         this.inputStream.close();
     } catch(IOException e) {
-      numberOfFailures++;
+      localNumberOfFailures++;
       LOG.error("Could not close input stream on: " + currentFile);
     }
 
@@ -455,7 +450,7 @@
     if (ff.retryCount > maxRetryCount)
       return;
 
-    numberOfRetries++;
+    localNumberOfRetries++;
     LOG.info("adding to failed list path {} offset {} retry {}", ff.path, ff.offset, ff.retryCount);
     failedFiles.add(ff);
   }
@@ -492,19 +487,7 @@
     currentFile = null;
     inputStream = null;
   }
- 
-  @Override
-  public Object aggregate(Collection<?> countersList)
-  {
-    FileCounters totalFileCounters = new FileCounters();
-    
-    for(Object fileCounters: countersList) {
-      totalFileCounters.addL((FileCounters) fileCounters);
-    }
-    
-    return totalFileCounters;
-  }
-  
+
   @Override
   public Collection<Partition<AbstractFSDirectoryInputOperator<T>>> definePartitions(Collection<Partition<AbstractFSDirectoryInputOperator<T>>> partitions, int incrementalCapacity)
   {
@@ -518,6 +501,14 @@
       return partitions;
     }
     
+    AbstractFSDirectoryInputOperator<T> oper = partitions.iterator().next().getPartitionedInstance();
+    
+    long globalNumberOfRetries = oper.globalNumberOfRetries;
+    long globalNumberOfFailures = ;
+    
+    long totalRetryCount = 0;
+    long totalFailureCount = 0;
+    
     /*
      * Build collective state from all instances of the operator.
      */
@@ -532,6 +523,8 @@
       totalFailedFiles.addAll(oper.failedFiles);
       totalPendingFiles.addAll(oper.pendingFiles);
       currentFiles.addAll(unfinishedFiles);
+      totalRetryCount += oper.localNumberOfRetries;
+      totalFailureCount += oper.localNumberOfFailures;
       if (oper.currentFile != null)
         currentFiles.add(new FailedFile(oper.currentFile, oper.offset));
       oldscanners.add(oper.getScanner());
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputHashFSDirectoryInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputHashFSDirectoryInputOperator.java
index 24ef113..68670f0 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputHashFSDirectoryInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputHashFSDirectoryInputOperator.java
@@ -67,16 +67,7 @@
   @Override
   public void emitTuples()
   {
-    if(System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis) {
-      Set<Path> newPaths = scanner.scan(fs, filePath, processedFiles);
-
-      for(Path newPath : newPaths) {
-        String newPathString = newPath.toString();
-        pendingFiles.add(newPathString);
-        processedFiles.add(newPathString);
-      }
-      lastScanMillis = System.currentTimeMillis();
-    }
+    scanDirectory();
 
     super.emitTuples();
   }