Merge pull request #505 from sebastian-nagel/NUTCH-2776-fetcher-dedup-redirects

NUTCH-2776 Fetcher to temporarily deduplicate followed redirects
diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml
index 6dfbe64..f0afd1c 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml
@@ -1221,6 +1221,30 @@
   </description>
 </property>
 
+<property>
+  <name>fetcher.redirect.dedupcache.seconds</name>
+  <value>-1</value>
+  <description>
+    The maximum time in seconds fetcher will cache redirects for
+    deduplication.  If the same redirect URL is seen again withing
+    this time it is skipped. This allows to avoid pathological cases
+    where many or most of the URLs of a host are redirected to the
+    same URL, eg. a page to login, accept cookies, indicating an
+    error.  A value less or equal zero disables redirect deduplication.
+    Caveat: This may break setting cookies via recursive redirect chains.
+  </description>
+</property>
+
+<property>
+  <name>fetcher.redirect.dedupcache.size</name>
+  <value>1000</value>
+  <description>
+    The maximum size of the cache to deduplicate redirects,
+    see `fetcher.redirect.dedupcache.seconds`.
+  </description>
+</property>
+
+
 <!-- SegmentReader -->
 <property>
   <name>segment.reader.content.recode</name>
diff --git a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
index 3c1003e..ce7b2b6 100644
--- a/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
+++ b/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
@@ -22,6 +22,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
@@ -30,9 +31,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
 /**
- * Convenience class - a collection of queues that keeps track of the total
- * number of items, and provides items eligible for fetching from any queue.
+ * A collection of queues that keeps track of the total number of items, and
+ * provides items eligible for fetching from any queue.
  */
 public class FetchItemQueues {
 
@@ -44,6 +49,8 @@
   private Set<String> queuesMaxExceptions = new HashSet<>();
   Iterator<Map.Entry<String, FetchItemQueue>> lastIterator = null;
   AtomicInteger totalSize = new AtomicInteger(0);
+  Cache<Text, Optional<String>> redirectDedupCache = null;
+
   int maxThreads;
   long crawlDelay;
   long minCrawlDelay;
@@ -77,6 +84,16 @@
     this.timelimit = conf.getLong("fetcher.timelimit", -1);
     this.maxExceptionsPerQueue = conf.getInt(
         "fetcher.max.exceptions.per.queue", -1);
+
+    int dedupRedirMaxTime = conf.getInt("fetcher.redirect.dedupcache.seconds",
+        -1);
+    int dedupRedirMaxSize = conf.getInt("fetcher.redirect.dedupcache.size",
+        1000);
+    if (dedupRedirMaxTime > 0 && dedupRedirMaxSize > 0) {
+      redirectDedupCache = CacheBuilder.newBuilder()
+          .maximumSize(dedupRedirMaxSize)
+          .expireAfterWrite(dedupRedirMaxTime, TimeUnit.SECONDS).build();
+    }
   }
 
   /**
@@ -246,6 +263,22 @@
     return 0;
   }
 
+  /**
+   * @param redirUrl
+   *          redirect target
+   * @return true if redirects are deduplicated and redirUrl has been queued
+   *         recently
+   */
+  public boolean redirectIsQueuedRecently(Text redirUrl) {
+    if (redirectDedupCache != null) {
+      if (redirectDedupCache.getIfPresent(redirUrl) != null) {
+        return true;
+      }
+      redirectDedupCache.put(redirUrl, Optional.absent());
+    }
+    return false;
+  }
+
   public synchronized void dump() {
     for (String id : queues.keySet()) {
       FetchItemQueue fiq = queues.get(id);
diff --git a/src/java/org/apache/nutch/fetcher/FetcherThread.java b/src/java/org/apache/nutch/fetcher/FetcherThread.java
index bc0d639..6cd1772 100644
--- a/src/java/org/apache/nutch/fetcher/FetcherThread.java
+++ b/src/java/org/apache/nutch/fetcher/FetcherThread.java
@@ -457,6 +457,8 @@
 
             if (redirecting && redirectCount > maxRedirect) {
               fetchQueues.finishFetchItem(fit);
+              context.getCounter("FetcherStatus", "redirect_count_exceeded")
+                  .increment(1);
               if (LOG.isInfoEnabled()) {
                 LOG.info("{} {} - redirect count exceeded {} ({})", getName(),
                     Thread.currentThread().getId(), fit.url,
@@ -592,6 +594,13 @@
 
   private FetchItem queueRedirect(Text redirUrl, FetchItem fit)
       throws ScoringFilterException {
+    if (fetchQueues.redirectIsQueuedRecently(redirUrl)) {
+      redirecting = false;
+      context.getCounter("FetcherStatus", "redirect_deduplicated").increment(1);
+      LOG.debug(" - ignoring redirect from {} to {} as duplicate", fit.url,
+          redirUrl);
+      return null;
+    }
     CrawlDatum newDatum = createRedirDatum(redirUrl, fit, CrawlDatum.STATUS_DB_UNFETCHED);
     fit = FetchItem.create(redirUrl, newDatum, queueMode);
     if (fit != null) {