Merge pull request #505 from sebastian-nagel/NUTCH-2776-fetcher-dedup-redirects
NUTCH-2776 Fetcher to temporarily deduplicate followed redirects
diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml
index 6dfbe64..f0afd1c 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml
@@ -1221,6 +1221,30 @@
</description>
</property>
+<property>
+ <name>fetcher.redirect.dedupcache.seconds</name>
+ <value>-1</value>
+ <description>
+ The maximum time in seconds fetcher will cache redirects for
+ deduplication. If the same redirect URL is seen again withing
+ this time it is skipped. This allows to avoid pathological cases
+ where many or most of the URLs of a host are redirected to the
+ same URL, eg. a page to login, accept cookies, indicating an
+ error. A value less or equal zero disables redirect deduplication.
+ Caveat: This may break setting cookies via recursive redirect chains.
+ </description>
+</property>
+
+<property>
+ <name>fetcher.redirect.dedupcache.size</name>
+ <value>1000</value>
+ <description>
+ The maximum size of the cache to deduplicate redirects,
+ see `fetcher.redirect.dedupcache.seconds`.
+ </description>
+</property>
+
+
<!-- SegmentReader -->
<property>
<name>segment.reader.content.recode</name>
diff --git a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
index 3c1003e..ce7b2b6 100644
--- a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
+++ b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
@@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -30,9 +31,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
/**
- * Convenience class - a collection of queues that keeps track of the total
- * number of items, and provides items eligible for fetching from any queue.
+ * A collection of queues that keeps track of the total number of items, and
+ * provides items eligible for fetching from any queue.
*/
public class FetchItemQueues {
@@ -44,6 +49,8 @@
private Set<String> queuesMaxExceptions = new HashSet<>();
Iterator<Map.Entry<String, FetchItemQueue>> lastIterator = null;
AtomicInteger totalSize = new AtomicInteger(0);
+ Cache<Text, Optional<String>> redirectDedupCache = null;
+
int maxThreads;
long crawlDelay;
long minCrawlDelay;
@@ -77,6 +84,16 @@
this.timelimit = conf.getLong("fetcher.timelimit", -1);
this.maxExceptionsPerQueue = conf.getInt(
"fetcher.max.exceptions.per.queue", -1);
+
+ int dedupRedirMaxTime = conf.getInt("fetcher.redirect.dedupcache.seconds",
+ -1);
+ int dedupRedirMaxSize = conf.getInt("fetcher.redirect.dedupcache.size",
+ 1000);
+ if (dedupRedirMaxTime > 0 && dedupRedirMaxSize > 0) {
+ redirectDedupCache = CacheBuilder.newBuilder()
+ .maximumSize(dedupRedirMaxSize)
+ .expireAfterWrite(dedupRedirMaxTime, TimeUnit.SECONDS).build();
+ }
}
/**
@@ -246,6 +263,22 @@
return 0;
}
+ /**
+ * @param redirUrl
+ * redirect target
+ * @return true if redirects are deduplicated and redirUrl has been queued
+ * recently
+ */
+ public boolean redirectIsQueuedRecently(Text redirUrl) {
+ if (redirectDedupCache != null) {
+ if (redirectDedupCache.getIfPresent(redirUrl) != null) {
+ return true;
+ }
+ redirectDedupCache.put(redirUrl, Optional.absent());
+ }
+ return false;
+ }
+
public synchronized void dump() {
for (String id : queues.keySet()) {
FetchItemQueue fiq = queues.get(id);
diff --git a/src/java/org/apache/nutch/fetcher/FetcherThread.java b/src/java/org/apache/nutch/fetcher/FetcherThread.java
index bc0d639..6cd1772 100644
--- a/src/java/org/apache/nutch/fetcher/FetcherThread.java
+++ b/src/java/org/apache/nutch/fetcher/FetcherThread.java
@@ -457,6 +457,8 @@
if (redirecting && redirectCount > maxRedirect) {
fetchQueues.finishFetchItem(fit);
+ context.getCounter("FetcherStatus", "redirect_count_exceeded")
+ .increment(1);
if (LOG.isInfoEnabled()) {
LOG.info("{} {} - redirect count exceeded {} ({})", getName(),
Thread.currentThread().getId(), fit.url,
@@ -592,6 +594,13 @@
private FetchItem queueRedirect(Text redirUrl, FetchItem fit)
throws ScoringFilterException {
+ if (fetchQueues.redirectIsQueuedRecently(redirUrl)) {
+ redirecting = false;
+ context.getCounter("FetcherStatus", "redirect_deduplicated").increment(1);
+ LOG.debug(" - ignoring redirect from {} to {} as duplicate", fit.url,
+ redirUrl);
+ return null;
+ }
CrawlDatum newDatum = createRedirDatum(redirUrl, fit, CrawlDatum.STATUS_DB_UNFETCHED);
fit = FetchItem.create(redirUrl, newDatum, queueMode);
if (fit != null) {