Merge pull request #1221 from chandnisingh/MLHR-1637
Cleaned up setup and not ignoring currentFile and offset if that is not ...
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
index 98f0054..b39ae48 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
@@ -395,22 +395,6 @@
filePath = new Path(directory);
configuration = new Configuration();
fs = getFSInstance();
- if(!unfinishedFiles.isEmpty()) {
- retryFailedFile(unfinishedFiles.poll());
- skipCount = 0;
- } else if(!failedFiles.isEmpty()) {
- retryFailedFile(failedFiles.poll());
- skipCount = 0;
- }
- long startTime = System.currentTimeMillis();
- LOG.info("Continue reading {} from index {} time={}", currentFile, offset, startTime);
- // fast forward to previous offset
- if(inputStream != null) {
- for(int index = 0; index < offset; index++) {
- readEntity();
- }
- }
- LOG.info("Read offset={} records in setup time={}", offset, System.currentTimeMillis() - startTime);
}
catch (IOException ex) {
failureHandling(ex);
@@ -511,7 +495,14 @@
{
if (inputStream == null) {
try {
- if (!unfinishedFiles.isEmpty()) {
+ if (currentFile != null && offset > 0) {
+ //open file resets offset to 0 so this a way around it.
+ int tmpOffset = offset;
+ this.inputStream = openFile(new Path(currentFile));
+ offset = tmpOffset;
+ skipCount = tmpOffset;
+ }
+ else if (!unfinishedFiles.isEmpty()) {
retryFailedFile(unfinishedFiles.poll());
}
else if (!pendingFiles.isEmpty()) {
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperatorTest.java
index af9a15e..bb5949c 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperatorTest.java
@@ -447,4 +447,146 @@
// No record should be read.
Assert.assertEquals("Remaining tuples read ", 6, sink.collectedTuples.size());
}
+
+ @Test
+ public void testRecoveryWithFailedFile() throws Exception
+ {
+ FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+ List<String> allLines = Lists.newArrayList();
+ HashSet<String> lines = Sets.newHashSet();
+ for (int line = 0; line < 5; line++) {
+ lines.add("f0" + "l" + line);
+ }
+ allLines.addAll(lines);
+ File testFile = new File(testMeta.dir, "file0");
+ FileUtils.write(testFile, StringUtils.join(lines, '\n'));
+
+
+ TestFSDirectoryInputOperator oper = new TestFSDirectoryInputOperator();
+ oper.scanner = null;
+ oper.failedFiles.add(new AbstractFSDirectoryInputOperator.FailedFile(testFile.getAbsolutePath(), 1));
+
+ CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ oper.output.setSink(sink);
+
+ oper.setDirectory(testMeta.dir);
+
+ oper.setup(null);
+ oper.beginWindow(0);
+ oper.emitTuples();
+ oper.endWindow();
+
+ oper.teardown();
+
+ Assert.assertEquals("number tuples", 4, queryResults.collectedTuples.size());
+ Assert.assertEquals("lines", allLines.subList(1, allLines.size()), new ArrayList<String>(queryResults.collectedTuples));
+ }
+
+ @Test
+ public void testRecoveryWithUnfinishedFile() throws Exception
+ {
+ FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+ List<String> allLines = Lists.newArrayList();
+ HashSet<String> lines = Sets.newHashSet();
+ for (int line = 0; line < 5; line++) {
+ lines.add("f0" + "l" + line);
+ }
+ allLines.addAll(lines);
+ File testFile = new File(testMeta.dir, "file0");
+ FileUtils.write(testFile, StringUtils.join(lines, '\n'));
+
+ TestFSDirectoryInputOperator oper = new TestFSDirectoryInputOperator();
+ oper.scanner = null;
+ oper.unfinishedFiles.add(new AbstractFSDirectoryInputOperator.FailedFile(testFile.getAbsolutePath(), 2));
+
+ CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ oper.output.setSink(sink);
+
+ oper.setDirectory(testMeta.dir);
+
+ oper.setup(null);
+ oper.beginWindow(0);
+ oper.emitTuples();
+ oper.endWindow();
+
+ oper.teardown();
+
+ Assert.assertEquals("number tuples", 3, queryResults.collectedTuples.size());
+ Assert.assertEquals("lines", allLines.subList(2, allLines.size()), new ArrayList<String>(queryResults.collectedTuples));
+ }
+
+ @Test
+ public void testRecoveryWithPendingFile() throws Exception
+ {
+ FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+ List<String> allLines = Lists.newArrayList();
+ HashSet<String> lines = Sets.newHashSet();
+ for (int line = 0; line < 5; line++) {
+ lines.add("f0" + "l" + line);
+ }
+ allLines.addAll(lines);
+ File testFile = new File(testMeta.dir, "file0");
+ FileUtils.write(testFile, StringUtils.join(lines, '\n'));
+
+ TestFSDirectoryInputOperator oper = new TestFSDirectoryInputOperator();
+ oper.scanner = null;
+ oper.pendingFiles.add(testFile.getAbsolutePath());
+
+ CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ oper.output.setSink(sink);
+
+ oper.setDirectory(testMeta.dir);
+
+ oper.setup(null);
+ oper.beginWindow(0);
+ oper.emitTuples();
+ oper.endWindow();
+
+ oper.teardown();
+
+ Assert.assertEquals("number tuples", 5, queryResults.collectedTuples.size());
+ Assert.assertEquals("lines", allLines, new ArrayList<String>(queryResults.collectedTuples));
+ }
+
+ @Test
+ public void testRecoveryWithCurrentFile() throws Exception
+ {
+ FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+ List<String> allLines = Lists.newArrayList();
+ HashSet<String> lines = Sets.newHashSet();
+ for (int line = 0; line < 5; line++) {
+ lines.add("f0" + "l" + line);
+ }
+ allLines.addAll(lines);
+ File testFile = new File(testMeta.dir, "file0");
+ FileUtils.write(testFile, StringUtils.join(lines, '\n'));
+
+ TestFSDirectoryInputOperator oper = new TestFSDirectoryInputOperator();
+ oper.scanner = null;
+ oper.currentFile = testFile.getAbsolutePath();
+ oper.offset = 1;
+
+ CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ oper.output.setSink(sink);
+
+ oper.setDirectory(testMeta.dir);
+
+ oper.setup(null);
+ oper.beginWindow(0);
+ oper.emitTuples();
+ oper.endWindow();
+
+ oper.teardown();
+
+ Assert.assertEquals("number tuples", 4, queryResults.collectedTuples.size());
+ Assert.assertEquals("lines", allLines.subList(1, allLines.size()), new ArrayList<String>(queryResults.collectedTuples));
+ }
}