NUTCH-2767 Fetcher to stop filling queues skipped due to repeated exception
- each queue checks whether max. exceptions are reached when adding a new
fetch item - if yes, the item is not added to the queue
- for proper counting and logging the queuing status is reported
to QueueFeeder
diff --git a/src/java/org/apache/nutch/fetcher/FetchItemQueue.java b/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
index 5096b37..2802c33 100644
--- a/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
+++ b/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
@@ -82,6 +82,10 @@
return inProgress.get();
}
+ public int getExceptionCounter() {
+ return exceptionCounter.get();
+ }
+
public int incrementExceptionCounter() {
return exceptionCounter.incrementAndGet();
}
diff --git a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
index 407d00f..9fce817 100644
--- a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
+++ b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
@@ -53,6 +53,13 @@
String queueMode;
+ enum QueuingStatus {
+ SUCCESSFULLY_QUEUED,
+ ERROR_CREATE_FETCH_ITEM,
+ ABOVE_EXCEPTION_THRESHOLD,
+ HIT_BY_TIMELIMIT;
+ }
+
public FetchItemQueues(Configuration conf) {
this.conf = conf;
this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
@@ -94,16 +101,23 @@
return queues.size();
}
- public void addFetchItem(Text url, CrawlDatum datum) {
+ public QueuingStatus addFetchItem(Text url, CrawlDatum datum) {
FetchItem it = FetchItem.create(url, datum, queueMode);
- if (it != null)
- addFetchItem(it);
+ if (it != null) {
+ return addFetchItem(it);
+ }
+ return QueuingStatus.ERROR_CREATE_FETCH_ITEM;
}
- public synchronized void addFetchItem(FetchItem it) {
+ public synchronized QueuingStatus addFetchItem(FetchItem it) {
FetchItemQueue fiq = getFetchItemQueue(it.queueID);
+ if (maxExceptionsPerQueue != -1
+ && fiq.getExceptionCounter() > maxExceptionsPerQueue) {
+ return QueuingStatus.ABOVE_EXCEPTION_THRESHOLD;
+ }
fiq.addFetchItem(it);
totalSize.incrementAndGet();
+ return QueuingStatus.SUCCESSFULLY_QUEUED;
}
public void finishFetchItem(FetchItem it) {
@@ -196,10 +210,10 @@
if (fiq == null) {
return 0;
}
+ int excCount = fiq.incrementExceptionCounter();
if (fiq.getQueueSize() == 0) {
return 0;
}
- int excCount = fiq.incrementExceptionCounter();
if (maxExceptionsPerQueue != -1 && excCount >= maxExceptionsPerQueue) {
// too many exceptions for items in this queue - purge it
int deleted = fiq.emptyQueue();
diff --git a/src/java/org/apache/nutch/fetcher/QueueFeeder.java b/src/java/org/apache/nutch/fetcher/QueueFeeder.java
index f5fa663..6c2ad39 100644
--- a/src/java/org/apache/nutch/fetcher/QueueFeeder.java
+++ b/src/java/org/apache/nutch/fetcher/QueueFeeder.java
@@ -19,10 +19,13 @@
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.MalformedURLException;
+import java.util.Map;
+import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.fetcher.FetchItemQueues.QueuingStatus;
import org.apache.nutch.fetcher.Fetcher.FetcherRun;
import org.apache.nutch.net.URLFilterException;
import org.apache.nutch.net.URLFilters;
@@ -86,14 +89,14 @@
public void run() {
boolean hasMore = true;
int cnt = 0;
- int timelimitcount = 0;
+ Map<QueuingStatus, Integer> queuingStatus = new TreeMap<>();
while (hasMore) {
if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
// enough ... lets' simply read all the entries from the input without
// processing them
try {
hasMore = context.nextKeyValue();
- timelimitcount++;
+ queuingStatus.compute(QueuingStatus.HIT_BY_TIMELIMIT, (k, v) -> v == null ? 1 : v + 1);
} catch (IOException e) {
LOG.error("QueueFeeder error reading input, record " + cnt, e);
return;
@@ -136,7 +139,13 @@
}
CrawlDatum datum = new CrawlDatum();
datum.set((CrawlDatum) context.getCurrentValue());
- queues.addFetchItem(url, datum);
+ QueuingStatus status = queues.addFetchItem(url, datum);
+ queuingStatus.compute(status, (k, v) -> v == null ? 1 : v + 1);
+ if (status == QueuingStatus.ABOVE_EXCEPTION_THRESHOLD) {
+ context
+ .getCounter("FetcherStatus", "AboveExceptionThresholdInQueue")
+ .increment(1);
+ }
cnt++;
feed--;
}
@@ -148,7 +157,12 @@
}
}
}
- LOG.info("QueueFeeder finished: total {} records hit by time limit : {}",
- cnt, timelimitcount);
+ LOG.info("QueueFeeder finished: total {} records", cnt);
+ if (queuingStatus.size() > 0) {
+ LOG.info("QueueFeeder queuing status:");
+ for (Map.Entry<QueuingStatus, Integer> e : queuingStatus.entrySet()) {
+ LOG.info("\t{}\t{}", e.getValue(), e.getKey());
+ }
+ }
}
}