NUTCH-2767 Fetcher to stop filling queues skipped due to repeated exception
- keep queues above exception threshold to ensure that no more items are
  added to these queues
- reuse iterator over queues to avoid that those queues have to be
  first skipped during each scan over queues
diff --git a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
index 9fce817..b090856 100644
--- a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
+++ b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
@@ -17,9 +17,9 @@
 package org.apache.nutch.fetcher;
 
 import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
@@ -38,7 +38,8 @@
       .getLogger(MethodHandles.lookup().lookupClass());
 
   public static final String DEFAULT_ID = "default";
-  Map<String, FetchItemQueue> queues = new HashMap<>();
+  Map<String, FetchItemQueue> queues = new ConcurrentHashMap<>();
+  Iterator<Map.Entry<String, FetchItemQueue>> lastIterator = null;
   AtomicInteger totalSize = new AtomicInteger(0);
   int maxThreads;
   long crawlDelay;
@@ -144,21 +145,33 @@
   }
 
   public synchronized FetchItem getFetchItem() {
-    Iterator<Map.Entry<String, FetchItemQueue>> it = queues.entrySet()
-        .iterator();
+
+    Iterator<Map.Entry<String, FetchItemQueue>> it = lastIterator;
+    if (it == null) {
+      it = queues.entrySet().iterator();
+    }
+
     while (it.hasNext()) {
       FetchItemQueue fiq = it.next().getValue();
-      // reap empty queues
-      if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
+
+      // reap empty queues,
+      // but keep queues which are above the exception threshold
+      // to ensure that no more items are added to these queues
+      if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0
+          && !(maxExceptionsPerQueue != -1
+              && fiq.getExceptionCounter() > maxExceptionsPerQueue)) {
         it.remove();
         continue;
       }
       FetchItem fit = fiq.getFetchItem();
       if (fit != null) {
         totalSize.decrementAndGet();
+        lastIterator = it;
         return fit;
       }
     }
+
+    lastIterator = null;
     return null;
   }