NUTCH-2767 Fetcher to stop filling queues skipped due to repeated exception
- keep queues above exception threshold to ensure that no more items are
added to these queues
- reuse iterator over queues to avoid that those queues have to be
first skipped during each scan over queues
diff --git a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
index 9fce817..b090856 100644
--- a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
+++ b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
@@ -17,9 +17,9 @@
package org.apache.nutch.fetcher;
import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -38,7 +38,8 @@
.getLogger(MethodHandles.lookup().lookupClass());
public static final String DEFAULT_ID = "default";
- Map<String, FetchItemQueue> queues = new HashMap<>();
+ Map<String, FetchItemQueue> queues = new ConcurrentHashMap<>();
+ Iterator<Map.Entry<String, FetchItemQueue>> lastIterator = null;
AtomicInteger totalSize = new AtomicInteger(0);
int maxThreads;
long crawlDelay;
@@ -144,21 +145,33 @@
}
public synchronized FetchItem getFetchItem() {
- Iterator<Map.Entry<String, FetchItemQueue>> it = queues.entrySet()
- .iterator();
+
+ Iterator<Map.Entry<String, FetchItemQueue>> it = lastIterator;
+ if (it == null) {
+ it = queues.entrySet().iterator();
+ }
+
while (it.hasNext()) {
FetchItemQueue fiq = it.next().getValue();
- // reap empty queues
- if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
+
+ // reap empty queues,
+ // but keep queues which are above the exception threshold
+ // to ensure that no more items are added to these queues
+ if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0
+ && !(maxExceptionsPerQueue != -1
+ && fiq.getExceptionCounter() > maxExceptionsPerQueue)) {
it.remove();
continue;
}
FetchItem fit = fiq.getFetchItem();
if (fit != null) {
totalSize.decrementAndGet();
+ lastIterator = it;
return fit;
}
}
+
+ lastIterator = null;
return null;
}