Merge pull request #483 from sju/NUTCH-2750
Fix for NUTCH-2750 Improve CrawlDbReader & LinkDbReader reader handling
- re-opens readers only if CrawlDb/LinkDb has changed (do not reopen for every query/URL)
diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
index a7d2f11..f59f895 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
@@ -42,6 +42,7 @@
import com.tdunning.math.stats.TDigest;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -89,12 +90,29 @@
protected String crawlDb;
+ private long lastModified = 0;
+
private void openReaders(String crawlDb, Configuration config)
throws IOException {
- if (readers != null)
- return;
Path crawlDbPath = new Path(crawlDb, CrawlDb.CURRENT_NAME);
- readers = MapFileOutputFormat.getReaders(crawlDbPath, config);
+
+ FileStatus stat = crawlDbPath.getFileSystem(config).getFileStatus(crawlDbPath);
+ long lastModified = stat.getModificationTime();
+
+ synchronized (this) {
+ if (readers != null) {
+ if (this.lastModified == lastModified) {
+ // CrawlDB not modified, re-use readers
+ return;
+ } else {
+ // CrawlDB modified, close and re-open readers
+ closeReaders();
+ }
+ }
+
+ this.lastModified = lastModified;
+ readers = MapFileOutputFormat.getReaders(crawlDbPath, config);
+ }
}
private void closeReaders() {
@@ -627,8 +645,6 @@
protected int process(String line, StringBuilder output) throws Exception {
Job job = NutchJob.getInstance(getConf());
Configuration config = job.getConfiguration();
- // Close readers, so we know we're not working on stale data
- closeReaders();
readUrl(this.crawlDb, line, config, output);
return 0;
}
diff --git a/src/java/org/apache/nutch/crawl/LinkDbReader.java b/src/java/org/apache/nutch/crawl/LinkDbReader.java
index 2d2a901..6ea3c26 100644
--- a/src/java/org/apache/nutch/crawl/LinkDbReader.java
+++ b/src/java/org/apache/nutch/crawl/LinkDbReader.java
@@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
@@ -63,6 +64,8 @@
private Path directory;
private MapFile.Reader[] readers;
+ private long lastModified = 0;
+
public LinkDbReader() {
//default constructor
}
@@ -76,6 +79,28 @@
this.directory = directory;
}
+ public void openReaders() throws IOException {
+ Path linkDbPath = new Path(directory, LinkDb.CURRENT_NAME);
+
+ FileStatus stat = linkDbPath.getFileSystem(getConf()).getFileStatus(directory);
+ long lastModified = stat.getModificationTime();
+
+ synchronized (this) {
+ if (readers != null) {
+ if (this.lastModified == lastModified) {
+ // CrawlDB not modified, re-use readers
+ return;
+ } else {
+ // CrawlDB modified, close and re-open readers
+ close();
+ }
+ }
+
+ this.lastModified = lastModified;
+ readers = MapFileOutputFormat.getReaders(linkDbPath, getConf());
+ }
+ }
+
public String[] getAnchors(Text url) throws IOException {
Inlinks inlinks = getInlinks(url);
if (inlinks == null)
@@ -84,13 +109,7 @@
}
public Inlinks getInlinks(Text url) throws IOException {
-
- if (readers == null) {
- synchronized (this) {
- readers = MapFileOutputFormat.getReaders(new Path(directory,
- LinkDb.CURRENT_NAME), getConf());
- }
- }
+ openReaders();
return (Inlinks) MapFileOutputFormat.getEntry(readers, PARTITIONER, url,
new Inlinks());
@@ -188,6 +207,7 @@
Iterator<Inlink> it = links.iterator();
while (it.hasNext()) {
output.append(it.next().toString());
+ output.append("\n");
}
}
output.append("\n");