Merge pull request #729 from sebastian-nagel/NUTCH-2947-keep-stateful-fetch-queues
NUTCH-2947 Fetcher: keep state of empty fetch queues unless queue feeder is finished
diff --git a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
index 4914cd2..9dfbeb2 100644
--- a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
+++ b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
@@ -57,6 +57,7 @@
long timelimit = -1;
int maxExceptionsPerQueue = -1;
long exceptionsPerQueueDelay = -1;
+ boolean feederAlive = true;
Configuration conf;
public static final String QUEUE_MODE_HOST = "byHost";
@@ -181,11 +182,25 @@
while (it.hasNext()) {
FetchItemQueue fiq = it.next().getValue();
- // reap empty queues
+ // reap empty queues which do not hold state required to ensure politeness
if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
- it.remove();
+ if (!feederAlive) {
+ // no more fetch items added
+ it.remove();
+ } else if ((maxExceptionsPerQueue > -1 || exceptionsPerQueueDelay > 0)
+ && fiq.exceptionCounter.get() > 0) {
+ // keep queue because the exceptions counter is bound to it
+ // and is required to skip or delay items on this queue
+ } else if (fiq.nextFetchTime.get() > System.currentTimeMillis()) {
+ // keep queue to have it blocked in case new fetch items of this queue
+ // are added by the QueueFeeder
+ } else {
+ // empty queue without state
+ it.remove();
+ }
continue;
}
+
FetchItem fit = fiq.getFetchItem();
if (fit != null) {
totalSize.decrementAndGet();
diff --git a/src/java/org/apache/nutch/fetcher/QueueFeeder.java b/src/java/org/apache/nutch/fetcher/QueueFeeder.java
index e57bb83..e327af1 100644
--- a/src/java/org/apache/nutch/fetcher/QueueFeeder.java
+++ b/src/java/org/apache/nutch/fetcher/QueueFeeder.java
@@ -89,7 +89,7 @@
int cnt = 0;
int[] queuingStatus = new int[QueuingStatus.values().length];
while (hasMore) {
- if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+ if (timelimit != -1 && System.currentTimeMillis() >= timelimit) {
// enough ... lets' simply read all the entries from the input without
// processing them
try {
@@ -155,6 +155,8 @@
}
}
}
+ // signal queues that no more new fetch items are added
+ queues.feederAlive = false;
LOG.info("QueueFeeder finished: total {} records", cnt);
LOG.info("QueueFeeder queuing status:");
for (QueuingStatus status : QueuingStatus.values()) {