NUTCH-2767 Fetcher to stop filling queues skipped due to repeated exception
- each queue checks whether max. exceptions are reached when adding a new
  fetch item - if yes, the item is not added to the queue
- for proper counting and logging the queuing status is reported
  to QueueFeeder
diff --git a/src/java/org/apache/nutch/fetcher/FetchItemQueue.java b/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
index 5096b37..2802c33 100644
--- a/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
+++ b/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
@@ -82,6 +82,10 @@
     return inProgress.get();
   }
 
+  public int getExceptionCounter() {
+    return exceptionCounter.get();
+  }
+
   public int incrementExceptionCounter() {
     return exceptionCounter.incrementAndGet();
   }
diff --git a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
index 407d00f..9fce817 100644
--- a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
+++ b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
@@ -53,6 +53,13 @@
 
   String queueMode;
 
+  enum QueuingStatus {
+    SUCCESSFULLY_QUEUED,
+    ERROR_CREATE_FETCH_ITEM,
+    ABOVE_EXCEPTION_THRESHOLD,
+    HIT_BY_TIMELIMIT;
+  }
+
   public FetchItemQueues(Configuration conf) {
     this.conf = conf;
     this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
@@ -94,16 +101,23 @@
     return queues.size();
   }
 
-  public void addFetchItem(Text url, CrawlDatum datum) {
+  public QueuingStatus addFetchItem(Text url, CrawlDatum datum) {
     FetchItem it = FetchItem.create(url, datum, queueMode);
-    if (it != null)
-      addFetchItem(it);
+    if (it != null) {
+      return addFetchItem(it);
+    }
+    return QueuingStatus.ERROR_CREATE_FETCH_ITEM;
   }
 
-  public synchronized void addFetchItem(FetchItem it) {
+  public synchronized QueuingStatus addFetchItem(FetchItem it) {
     FetchItemQueue fiq = getFetchItemQueue(it.queueID);
+    if (maxExceptionsPerQueue != -1
+        && fiq.getExceptionCounter() > maxExceptionsPerQueue) {
+      return QueuingStatus.ABOVE_EXCEPTION_THRESHOLD;
+    }
     fiq.addFetchItem(it);
     totalSize.incrementAndGet();
+    return QueuingStatus.SUCCESSFULLY_QUEUED;
   }
 
   public void finishFetchItem(FetchItem it) {
@@ -196,10 +210,10 @@
     if (fiq == null) {
       return 0;
     }
+    int excCount = fiq.incrementExceptionCounter();
     if (fiq.getQueueSize() == 0) {
       return 0;
     }
-    int excCount = fiq.incrementExceptionCounter();
     if (maxExceptionsPerQueue != -1 && excCount >= maxExceptionsPerQueue) {
       // too many exceptions for items in this queue - purge it
       int deleted = fiq.emptyQueue();
diff --git a/src/java/org/apache/nutch/fetcher/QueueFeeder.java b/src/java/org/apache/nutch/fetcher/QueueFeeder.java
index f5fa663..6c2ad39 100644
--- a/src/java/org/apache/nutch/fetcher/QueueFeeder.java
+++ b/src/java/org/apache/nutch/fetcher/QueueFeeder.java
@@ -19,10 +19,13 @@
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.net.MalformedURLException;
+import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.fetcher.FetchItemQueues.QueuingStatus;
 import org.apache.nutch.fetcher.Fetcher.FetcherRun;
 import org.apache.nutch.net.URLFilterException;
 import org.apache.nutch.net.URLFilters;
@@ -86,14 +89,14 @@
   public void run() {
     boolean hasMore = true;
     int cnt = 0;
-    int timelimitcount = 0;
+    Map<QueuingStatus, Integer> queuingStatus = new TreeMap<>();
     while (hasMore) {
       if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
         // enough ... lets' simply read all the entries from the input without
         // processing them
         try {
           hasMore = context.nextKeyValue();
-          timelimitcount++;
+          queuingStatus.compute(QueuingStatus.HIT_BY_TIMELIMIT, (k, v) -> v == null ? 1 : v + 1);
         } catch (IOException e) {
           LOG.error("QueueFeeder error reading input, record " + cnt, e);
           return;
@@ -136,7 +139,13 @@
             }
             CrawlDatum datum = new CrawlDatum();
             datum.set((CrawlDatum) context.getCurrentValue());
-            queues.addFetchItem(url, datum);
+            QueuingStatus status = queues.addFetchItem(url, datum);
+            queuingStatus.compute(status, (k, v) -> v == null ? 1 : v + 1);
+            if (status == QueuingStatus.ABOVE_EXCEPTION_THRESHOLD) {
+              context
+                  .getCounter("FetcherStatus", "AboveExceptionThresholdInQueue")
+                  .increment(1);
+            }
             cnt++;
             feed--;
           }
@@ -148,7 +157,12 @@
         }
       }
     }
-    LOG.info("QueueFeeder finished: total {} records hit by time limit : {}",
-        cnt, timelimitcount);
+    LOG.info("QueueFeeder finished: total {} records", cnt);
+    if (queuingStatus.size() > 0) {
+      LOG.info("QueueFeeder queuing status:");
+      for (Map.Entry<QueuingStatus, Integer> e : queuingStatus.entrySet()) {
+        LOG.info("\t{}\t{}", e.getValue(), e.getKey());
+      }
+    }
   }
 }