Repartitioning Debugging.
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputHashFSDirectoryInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputHashFSDirectoryInputOperator.java
index 5b79086..9c2b503 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputHashFSDirectoryInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputHashFSDirectoryInputOperator.java
@@ -76,35 +76,34 @@
totalFailedFiles.addAll(oper.failedFiles);
totalPendingFiles.addAll(oper.pendingFiles);
oper.pendingFiles.size();
- if (oper.currentFile != null)
+ if (oper.currentFile != null) {
currentFiles.add(new FailedFile(oper.currentFile, oper.offset));
- oldscanners.add(oper.getScanner());
+ }
+ oldscanners.add(oper.getScanner());
}
int totalFileCount;
int newOperatorCount;
- if(!firstStart)
- {
+ if(!firstStart) {
totalFileCount = currentFiles.size() + totalFailedFiles.size() + totalPendingFiles.size();
newOperatorCount = totalFileCount / preferredMaxPendingFilesPerOperator;
- if(totalFileCount % preferredMaxPendingFilesPerOperator > 0)
- {
+ if(totalFileCount % preferredMaxPendingFilesPerOperator > 0) {
newOperatorCount++;
}
- if(newOperatorCount > partitionCount)
- {
+ if(newOperatorCount > partitionCount) {
newOperatorCount = partitionCount;
}
}
- else
- {
+ else {
newOperatorCount = partitionCount;
firstStart = false;
}
+ LOG.info("Partitioning: " + newOperatorCount);
+
//if(newOperatorCount == partitions.size())
//{
// return partitions;
@@ -120,7 +119,6 @@
operator = kryo.copy(this);
-
operator.processedFiles.addAll(totalProcessedFiles);
operator.currentFile = null;
operator.offset = 0;