Testing
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
index 6e0863a..522de93 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
@@ -59,7 +59,7 @@
*
* @since 1.0.2
*/
-public abstract class AbstractFSDirectoryInputOperator<T> implements InputOperator, Partitioner<AbstractFSDirectoryInputOperator<T>>, StatsListener, CountersAggregator
+public abstract class AbstractFSDirectoryInputOperator<T> implements InputOperator, Partitioner<AbstractFSDirectoryInputOperator<T>>, StatsListener
{
private static final Logger LOG = LoggerFactory.getLogger(AbstractFSDirectoryInputOperator.class);
@@ -78,8 +78,13 @@
private int maxRetryCount = 5;
transient protected int skipCount = 0;
private transient OperatorContext context;
- private long numberOfFailures = 0;
- private long numberOfRetries = 0;
+
+ protected long globalNumberOfFailures = 0;
+ protected long localNumberOfFailures = 0;
+ protected long globalNumberOfRetries = 0;
+ protected long localNumberOfRetries = 0;
+ private transient int globalProcessedFileCount = 0;
+ private transient int localProcessedFileCount = 0;
/**
* Class representing failed file, When read fails on a file in middle, then the file is
@@ -122,78 +127,66 @@
}
}
- public static class FileCounters implements Serializable
+ public final static class FileCounters implements Serializable
{
- int processedFiles;
- long pendingFiles;
- long numberOfFailures;
- long numberOfRetries;
+ public int globalProcessedFiles;
+ public int localProcessedFiles;
+ public long globalNumberOfFailures;
+ public long localNumberOfFailures;
+ public long globalNumberOfRetries;
+ public long localNumberOfRetries;
+ public long pendingFiles;
public FileCounters()
{
- this.processedFiles = 0;
+ this.globalProcessedFiles = 0;
+ this.localProcessedFiles = 0;
+ this.globalNumberOfFailures = 0;
+ this.localNumberOfFailures = 0;
+ this.globalNumberOfRetries = 0;
+ this.localNumberOfRetries = 0;
this.pendingFiles = 0;
- this.numberOfFailures = 0;
- this.numberOfRetries = 0;
}
- public FileCounters(int processedFiles,
- long pendingFiles,
- long numberOfFailures,
- long numberOfRetries)
+ public FileCounters(int globalProcessedFiles,
+ int localProcessedFiles,
+ long globalNumberOfFailures,
+ long localNumberOfFailures,
+ long globalNumberOfRetries,
+ long localNumberOfRetries,
+ long pendingFiles)
{
- setProcessedFiles(processedFiles);
- setPendingFiles(pendingFiles);
- setNumberOfFailures(numberOfFailures);
- setNumberOfRetries(numberOfRetries);
+ this.globalProcessedFiles = globalProcessedFiles;
+ this.localProcessedFiles = localProcessedFiles;
+ this.globalNumberOfFailures = globalNumberOfFailures;
+ this.localNumberOfFailures = localNumberOfFailures;
+ this.globalNumberOfRetries = globalNumberOfRetries;
+ this.localNumberOfRetries = localNumberOfRetries;
+ this.pendingFiles = pendingFiles;
}
public void addL(FileCounters fileCounters)
{
- this.processedFiles += fileCounters.getProcessedFiles();
- this.pendingFiles += fileCounters.getPendingFiles();
- this.numberOfFailures += fileCounters.getNumberOfFailures();
- this.numberOfRetries += fileCounters.getNumberOfRetries();
+ this.localProcessedFiles += fileCounters.localProcessedFiles;
+ this.localNumberOfFailures += fileCounters.localNumberOfFailures;
+ this.localNumberOfRetries += fileCounters.localNumberOfRetries;
+ this.pendingFiles += fileCounters.pendingFiles;
}
-
- private void setProcessedFiles(int processedFiles)
+ }
+
+ public static class FileCountersAggregator implements CountersAggregator,
+ Serializable
+ {
+ @Override
+ public Object aggregate(Collection<?> countersList)
{
- this.processedFiles = processedFiles;
- }
+ FileCounters totalFileCounters = new FileCounters();
- public int getProcessedFiles()
- {
- return processedFiles;
- }
+ for(Object fileCounters: countersList) {
+ totalFileCounters.addL((FileCounters) fileCounters);
+ }
- private void setPendingFiles(long pendingFiles)
- {
- this.pendingFiles = pendingFiles;
- }
-
- public long getPendingFiles()
- {
- return pendingFiles;
- }
-
- private void setNumberOfFailures(long numberOfFailures)
- {
- this.numberOfFailures = numberOfFailures;
- }
-
- public long getNumberOfFailures()
- {
- return numberOfFailures;
- }
-
- private void setNumberOfRetries(long numberOfRetries)
- {
- this.numberOfRetries = numberOfRetries;
- }
-
- public long getNumberOfRetries()
- {
- return numberOfRetries;
+ return totalFileCounters;
}
}
@@ -280,6 +273,7 @@
@Override
public void setup(OperatorContext context)
{
+ globalProcessedFileCount = processedFiles.size();
this.context = context;
try {
@@ -304,12 +298,7 @@
LOG.info("Read offset={} records in setup time={}", offset, System.currentTimeMillis() - startTime);
}
catch (IOException ex) {
- numberOfFailures++;
- if(maxRetryCount <= 0) {
- throw new RuntimeException(ex);
- }
- LOG.error("FS reader error", ex);
- addToFailedList();
+ failureHandling(ex);
}
}
@@ -330,7 +319,6 @@
public void endWindow()
{
if(context != null) {
- int processedFileCount = processedFiles.size();
long pendingFileCount = ((long) pendingFiles.size()) +
((long) failedFiles.size()) +
((long) unfinishedFiles.size());
@@ -369,24 +357,10 @@
retryFailedFile(failedFiles.poll());
}
else {
- if (System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis) {
- Set<Path> newPaths = scanner.scan(fs, filePath, processedFiles);
-
- for(Path newPath: newPaths) {
- String newPathString = newPath.toString();
- pendingFiles.add(newPathString);
- processedFiles.add(newPathString);
- }
- lastScanMillis = System.currentTimeMillis();
- }
+ scanDirectory();
}
} catch (IOException ex) {
- numberOfFailures++;
- if(maxRetryCount <= 0) {
- throw new RuntimeException(ex);
- }
- LOG.error("FS reader error", ex);
- addToFailedList();
+ failureHandling(ex);
}
}
@@ -413,12 +387,7 @@
skipCount--;
}
} catch (IOException e) {
- numberOfFailures++;
- if(maxRetryCount <= 0) {
- throw new RuntimeException(e);
- }
- LOG.error("FS reader error", e);
- addToFailedList();
+ failureHandling(e);
}
}
//If the operator is idempotent, do nothing on other calls to emittuples
@@ -429,6 +398,32 @@
}
}
}
+
+ protected void scanDirectory()
+ {
+ if(System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis) {
+ Set<Path> newPaths = scanner.scan(fs, filePath, processedFiles);
+
+ for(Path newPath : newPaths) {
+ String newPathString = newPath.toString();
+ pendingFiles.add(newPathString);
+ processedFiles.add(newPathString);
+ localProcessedFileCount++;
+ }
+
+ lastScanMillis = System.currentTimeMillis();
+ }
+ }
+
+ private void failureHandling(Exception e)
+ {
+ localNumberOfFailures++;
+ if(maxRetryCount <= 0) {
+ throw new RuntimeException(e);
+ }
+ LOG.error("FS reader error", e);
+ addToFailedList();
+ }
protected void addToFailedList() {
@@ -439,7 +434,7 @@
if (this.inputStream != null)
this.inputStream.close();
} catch(IOException e) {
- numberOfFailures++;
+ localNumberOfFailures++;
LOG.error("Could not close input stream on: " + currentFile);
}
@@ -455,7 +450,7 @@
if (ff.retryCount > maxRetryCount)
return;
- numberOfRetries++;
+ localNumberOfRetries++;
LOG.info("adding to failed list path {} offset {} retry {}", ff.path, ff.offset, ff.retryCount);
failedFiles.add(ff);
}
@@ -492,19 +487,7 @@
currentFile = null;
inputStream = null;
}
-
- @Override
- public Object aggregate(Collection<?> countersList)
- {
- FileCounters totalFileCounters = new FileCounters();
-
- for(Object fileCounters: countersList) {
- totalFileCounters.addL((FileCounters) fileCounters);
- }
-
- return totalFileCounters;
- }
-
+
@Override
public Collection<Partition<AbstractFSDirectoryInputOperator<T>>> definePartitions(Collection<Partition<AbstractFSDirectoryInputOperator<T>>> partitions, int incrementalCapacity)
{
@@ -518,6 +501,14 @@
return partitions;
}
+ AbstractFSDirectoryInputOperator<T> oper = partitions.iterator().next().getPartitionedInstance();
+
+ long globalNumberOfRetries = oper.globalNumberOfRetries;
+ long globalNumberOfFailures = ;
+
+ long totalRetryCount = 0;
+ long totalFailureCount = 0;
+
/*
* Build collective state from all instances of the operator.
*/
@@ -532,6 +523,8 @@
totalFailedFiles.addAll(oper.failedFiles);
totalPendingFiles.addAll(oper.pendingFiles);
currentFiles.addAll(unfinishedFiles);
+ totalRetryCount += oper.localNumberOfRetries;
+ totalFailureCount += oper.localNumberOfFailures;
if (oper.currentFile != null)
currentFiles.add(new FailedFile(oper.currentFile, oper.offset));
oldscanners.add(oper.getScanner());
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputHashFSDirectoryInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputHashFSDirectoryInputOperator.java
index 24ef113..68670f0 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputHashFSDirectoryInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputHashFSDirectoryInputOperator.java
@@ -67,16 +67,7 @@
@Override
public void emitTuples()
{
- if(System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis) {
- Set<Path> newPaths = scanner.scan(fs, filePath, processedFiles);
-
- for(Path newPath : newPaths) {
- String newPathString = newPath.toString();
- pendingFiles.add(newPathString);
- processedFiles.add(newPathString);
- }
- lastScanMillis = System.currentTimeMillis();
- }
+ scanDirectory();
super.emitTuples();
}