Fix for NUTCH-2750
diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
index a7d2f11..f59f895 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
@@ -42,6 +42,7 @@
 import com.tdunning.math.stats.TDigest;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -89,12 +90,29 @@
 
   protected String crawlDb;
 
+  private long lastModified = 0;
+
   private void openReaders(String crawlDb, Configuration config)
       throws IOException {
-    if (readers != null)
-      return;
     Path crawlDbPath = new Path(crawlDb, CrawlDb.CURRENT_NAME);
-    readers = MapFileOutputFormat.getReaders(crawlDbPath, config);
+
+    FileStatus stat = crawlDbPath.getFileSystem(config).getFileStatus(crawlDbPath);
+    long lastModified = stat.getModificationTime();
+
+    synchronized (this) {
+      if (readers != null) {
+        if (this.lastModified == lastModified) {
+          // CrawlDB not modified, re-use readers
+          return;
+        } else {
+          // CrawlDB modified, close and re-open readers
+          closeReaders();
+        }
+      }
+
+      this.lastModified = lastModified;
+      readers = MapFileOutputFormat.getReaders(crawlDbPath, config);
+    }
   }
 
   private void closeReaders() {
@@ -627,8 +645,6 @@
   protected int process(String line, StringBuilder output) throws Exception {
     Job job = NutchJob.getInstance(getConf());
     Configuration config = job.getConfiguration();
-    // Close readers, so we know we're not working on stale data
-    closeReaders();
     readUrl(this.crawlDb, line, config, output);
     return 0;
   }
diff --git a/src/java/org/apache/nutch/crawl/LinkDbReader.java b/src/java/org/apache/nutch/crawl/LinkDbReader.java
index 2d2a901..6ea3c26 100644
--- a/src/java/org/apache/nutch/crawl/LinkDbReader.java
+++ b/src/java/org/apache/nutch/crawl/LinkDbReader.java
@@ -26,6 +26,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.Text;
@@ -63,6 +64,8 @@
   private Path directory;
   private MapFile.Reader[] readers;
 
+  private long lastModified = 0;
+
   public LinkDbReader() {
     //default constructor
   }
@@ -76,6 +79,28 @@
     this.directory = directory;
   }
 
+  public void openReaders() throws IOException {
+    Path linkDbPath = new Path(directory, LinkDb.CURRENT_NAME);
+
+    FileStatus stat = linkDbPath.getFileSystem(getConf()).getFileStatus(directory);
+    long lastModified = stat.getModificationTime();
+
+    synchronized (this) {
+      if (readers != null) {
+        if (this.lastModified == lastModified) {
+          // CrawlDB not modified, re-use readers
+          return;
+        } else {
+          // CrawlDB modified, close and re-open readers
+          close();
+        }
+      }
+
+      this.lastModified = lastModified;
+      readers = MapFileOutputFormat.getReaders(linkDbPath, getConf());
+    }
+  }
+
   public String[] getAnchors(Text url) throws IOException {
     Inlinks inlinks = getInlinks(url);
     if (inlinks == null)
@@ -84,13 +109,7 @@
   }
 
   public Inlinks getInlinks(Text url) throws IOException {
-
-    if (readers == null) {
-      synchronized (this) {
-        readers = MapFileOutputFormat.getReaders(new Path(directory,
-            LinkDb.CURRENT_NAME), getConf());
-      }
-    }
+    openReaders();
 
     return (Inlinks) MapFileOutputFormat.getEntry(readers, PARTITIONER, url,
         new Inlinks());
@@ -188,6 +207,7 @@
       Iterator<Inlink> it = links.iterator();
       while (it.hasNext()) {
         output.append(it.next().toString());
+        output.append("\n");
       }
     }
     output.append("\n");