Merge pull request #729 from sebastian-nagel/NUTCH-2947-keep-stateful-fetch-queues

NUTCH-2947 Fetcher: keep state of empty fetch queues unless queue feeder is finished
diff --git a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
index 4914cd2..9dfbeb2 100644
--- a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
+++ b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
@@ -57,6 +57,7 @@
   long timelimit = -1;
   int maxExceptionsPerQueue = -1;
   long exceptionsPerQueueDelay = -1;
+  boolean feederAlive = true;
   Configuration conf;
 
   public static final String QUEUE_MODE_HOST = "byHost";
@@ -181,11 +182,25 @@
     while (it.hasNext()) {
       FetchItemQueue fiq = it.next().getValue();
 
-      // reap empty queues
+      // reap empty queues which do not hold state required to ensure politeness
       if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
-        it.remove();
+        if (!feederAlive) {
+          // no more fetch items added
+          it.remove();
+        } else if ((maxExceptionsPerQueue > -1 || exceptionsPerQueueDelay > 0)
+            && fiq.exceptionCounter.get() > 0) {
+          // keep queue because the exceptions counter is bound to it
+          // and is required to skip or delay items on this queue
+        } else if (fiq.nextFetchTime.get() > System.currentTimeMillis()) {
+          // keep queue to have it blocked in case new fetch items of this queue
+          // are added by the QueueFeeder
+        } else {
+          // empty queue without state
+          it.remove();
+        }
         continue;
       }
+
       FetchItem fit = fiq.getFetchItem();
       if (fit != null) {
         totalSize.decrementAndGet();
diff --git a/src/java/org/apache/nutch/fetcher/QueueFeeder.java b/src/java/org/apache/nutch/fetcher/QueueFeeder.java
index e57bb83..e327af1 100644
--- a/src/java/org/apache/nutch/fetcher/QueueFeeder.java
+++ b/src/java/org/apache/nutch/fetcher/QueueFeeder.java
@@ -89,7 +89,7 @@
     int cnt = 0;
     int[] queuingStatus = new int[QueuingStatus.values().length];
     while (hasMore) {
-      if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+      if (timelimit != -1 && System.currentTimeMillis() >= timelimit) {
         // enough ... lets' simply read all the entries from the input without
         // processing them
         try {
@@ -155,6 +155,8 @@
         }
       }
     }
+    // signal queues that no more new fetch items are added
+    queues.feederAlive = false;
     LOG.info("QueueFeeder finished: total {} records", cnt);
     LOG.info("QueueFeeder queuing status:");
     for (QueuingStatus status : QueuingStatus.values()) {