Merge pull request #1221 from chandnisingh/MLHR-1637

Cleaned up setup and not ignoring currentFile and offset if that is not ...
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
index 98f0054..b39ae48 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
@@ -395,22 +395,6 @@
       filePath = new Path(directory);
       configuration = new Configuration();
       fs = getFSInstance();
-      if(!unfinishedFiles.isEmpty()) {
-        retryFailedFile(unfinishedFiles.poll());
-        skipCount = 0;
-      } else if(!failedFiles.isEmpty()) {
-        retryFailedFile(failedFiles.poll());
-        skipCount = 0;
-      }
-      long startTime = System.currentTimeMillis();
-      LOG.info("Continue reading {} from index {} time={}", currentFile, offset, startTime);
-      // fast forward to previous offset
-      if(inputStream != null) {
-        for(int index = 0; index < offset; index++) {
-          readEntity();
-        }
-      }
-      LOG.info("Read offset={} records in setup time={}", offset, System.currentTimeMillis() - startTime);
     }
     catch (IOException ex) {
       failureHandling(ex);
@@ -511,7 +495,14 @@
   {
     if (inputStream == null) {
       try {
-        if (!unfinishedFiles.isEmpty()) {
+        if (currentFile != null && offset > 0) {
+          //open file resets offset to 0 so this a way around it.
+          int tmpOffset = offset;
+          this.inputStream = openFile(new Path(currentFile));
+          offset = tmpOffset;
+          skipCount = tmpOffset;
+        }
+        else if (!unfinishedFiles.isEmpty()) {
           retryFailedFile(unfinishedFiles.poll());
         }
         else if (!pendingFiles.isEmpty()) {
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperatorTest.java
index af9a15e..bb5949c 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperatorTest.java
@@ -447,4 +447,146 @@
     // No record should be read.
     Assert.assertEquals("Remaining tuples read ", 6, sink.collectedTuples.size());
   }
+
+  @Test
+  public void testRecoveryWithFailedFile() throws Exception
+  {
+    FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+    List<String> allLines = Lists.newArrayList();
+    HashSet<String> lines = Sets.newHashSet();
+    for (int line = 0; line < 5; line++) {
+      lines.add("f0" + "l" + line);
+    }
+    allLines.addAll(lines);
+    File testFile = new File(testMeta.dir, "file0");
+    FileUtils.write(testFile, StringUtils.join(lines, '\n'));
+
+
+    TestFSDirectoryInputOperator oper = new TestFSDirectoryInputOperator();
+    oper.scanner = null;
+    oper.failedFiles.add(new AbstractFSDirectoryInputOperator.FailedFile(testFile.getAbsolutePath(), 1));
+
+    CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    oper.output.setSink(sink);
+
+    oper.setDirectory(testMeta.dir);
+
+    oper.setup(null);
+    oper.beginWindow(0);
+    oper.emitTuples();
+    oper.endWindow();
+
+    oper.teardown();
+
+    Assert.assertEquals("number tuples", 4, queryResults.collectedTuples.size());
+    Assert.assertEquals("lines", allLines.subList(1, allLines.size()), new ArrayList<String>(queryResults.collectedTuples));
+  }
+
+  @Test
+  public void testRecoveryWithUnfinishedFile() throws Exception
+  {
+    FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+    List<String> allLines = Lists.newArrayList();
+    HashSet<String> lines = Sets.newHashSet();
+    for (int line = 0; line < 5; line++) {
+      lines.add("f0" + "l" + line);
+    }
+    allLines.addAll(lines);
+    File testFile = new File(testMeta.dir, "file0");
+    FileUtils.write(testFile, StringUtils.join(lines, '\n'));
+
+    TestFSDirectoryInputOperator oper = new TestFSDirectoryInputOperator();
+    oper.scanner = null;
+    oper.unfinishedFiles.add(new AbstractFSDirectoryInputOperator.FailedFile(testFile.getAbsolutePath(), 2));
+
+    CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    oper.output.setSink(sink);
+
+    oper.setDirectory(testMeta.dir);
+
+    oper.setup(null);
+    oper.beginWindow(0);
+    oper.emitTuples();
+    oper.endWindow();
+
+    oper.teardown();
+
+    Assert.assertEquals("number tuples", 3, queryResults.collectedTuples.size());
+    Assert.assertEquals("lines", allLines.subList(2, allLines.size()), new ArrayList<String>(queryResults.collectedTuples));
+  }
+
+  @Test
+  public void testRecoveryWithPendingFile() throws Exception
+  {
+    FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+    List<String> allLines = Lists.newArrayList();
+    HashSet<String> lines = Sets.newHashSet();
+    for (int line = 0; line < 5; line++) {
+      lines.add("f0" + "l" + line);
+    }
+    allLines.addAll(lines);
+    File testFile = new File(testMeta.dir, "file0");
+    FileUtils.write(testFile, StringUtils.join(lines, '\n'));
+
+    TestFSDirectoryInputOperator oper = new TestFSDirectoryInputOperator();
+    oper.scanner = null;
+    oper.pendingFiles.add(testFile.getAbsolutePath());
+
+    CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    oper.output.setSink(sink);
+
+    oper.setDirectory(testMeta.dir);
+
+    oper.setup(null);
+    oper.beginWindow(0);
+    oper.emitTuples();
+    oper.endWindow();
+
+    oper.teardown();
+
+    Assert.assertEquals("number tuples", 5, queryResults.collectedTuples.size());
+    Assert.assertEquals("lines", allLines, new ArrayList<String>(queryResults.collectedTuples));
+  }
+
+  @Test
+  public void testRecoveryWithCurrentFile() throws Exception
+  {
+    FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+    List<String> allLines = Lists.newArrayList();
+    HashSet<String> lines = Sets.newHashSet();
+    for (int line = 0; line < 5; line++) {
+      lines.add("f0" + "l" + line);
+    }
+    allLines.addAll(lines);
+    File testFile = new File(testMeta.dir, "file0");
+    FileUtils.write(testFile, StringUtils.join(lines, '\n'));
+
+    TestFSDirectoryInputOperator oper = new TestFSDirectoryInputOperator();
+    oper.scanner = null;
+    oper.currentFile = testFile.getAbsolutePath();
+    oper.offset = 1;
+
+    CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    oper.output.setSink(sink);
+
+    oper.setDirectory(testMeta.dir);
+
+    oper.setup(null);
+    oper.beginWindow(0);
+    oper.emitTuples();
+    oper.endWindow();
+
+    oper.teardown();
+
+    Assert.assertEquals("number tuples", 4, queryResults.collectedTuples.size());
+    Assert.assertEquals("lines", allLines.subList(1, allLines.size()), new ArrayList<String>(queryResults.collectedTuples));
+  }
 }