NUTCH-2694 HostDB to aggregate by long instead of integer
diff --git a/CHANGES.txt b/CHANGES.txt
index 96bd05a..12f5aad 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,9 +6,12 @@
Breaking Changes
-The value of crawl.gen.delay is now read in milliseconds as stated in the description
-in nutch-default.xml. Previously, the value has been read in days, see NUTCH-1842 for
-further information.
+ - The value of crawl.gen.delay is now read in milliseconds as stated in the description
+ in nutch-default.xml. Previously, the value has been read in days, see NUTCH-1842 for
+ further information.
+
+ - HostDB entries have been moved from Integer to Long in order to accomodate very large
+ hosts. Remove your existing HostDB and recreate it with bin/nutch updatehostdb.
Nutch 1.15 Release (25/07/2018)
diff --git a/src/java/org/apache/nutch/hostdb/HostDatum.java b/src/java/org/apache/nutch/hostdb/HostDatum.java
index fe3b73e..2bc9244 100644
--- a/src/java/org/apache/nutch/hostdb/HostDatum.java
+++ b/src/java/org/apache/nutch/hostdb/HostDatum.java
@@ -30,7 +30,7 @@
/**
*/
public class HostDatum implements Writable, Cloneable {
- protected int failures = 0;
+ protected long failures = 0;
protected float score = 0;
protected Date lastCheck = new Date(0);
protected String homepageUrl = new String();
@@ -38,17 +38,17 @@
protected MapWritable metaData = new MapWritable();
// Records the number of times DNS look-up failed, may indicate host no longer exists
- protected int dnsFailures = 0;
+ protected long dnsFailures = 0;
// Records the number of connection failures, may indicate our netwerk being blocked by firewall
- protected int connectionFailures = 0;
+ protected long connectionFailures = 0;
- protected int unfetched = 0;
- protected int fetched = 0;
- protected int notModified = 0;
- protected int redirTemp = 0;
- protected int redirPerm = 0;
- protected int gone = 0;
+ protected long unfetched = 0;
+ protected long fetched = 0;
+ protected long notModified = 0;
+ protected long redirTemp = 0;
+ protected long redirPerm = 0;
+ protected long gone = 0;
public HostDatum() {
}
@@ -68,15 +68,15 @@
}
public void resetFailures() {
- setDnsFailures(0);
- setConnectionFailures(0);
+ setDnsFailures(0l);
+ setConnectionFailures(0l);
}
- public void setDnsFailures(Integer dnsFailures) {
+ public void setDnsFailures(Long dnsFailures) {
this.dnsFailures = dnsFailures;
}
- public void setConnectionFailures(Integer connectionFailures) {
+ public void setConnectionFailures(Long connectionFailures) {
this.connectionFailures = connectionFailures;
}
@@ -88,15 +88,15 @@
this.connectionFailures++;
}
- public Integer numFailures() {
+ public Long numFailures() {
return getDnsFailures() + getConnectionFailures();
}
- public Integer getDnsFailures() {
+ public Long getDnsFailures() {
return dnsFailures;
}
- public Integer getConnectionFailures() {
+ public Long getConnectionFailures() {
return connectionFailures;
}
@@ -120,7 +120,7 @@
return score;
}
- public Integer numRecords() {
+ public Long numRecords() {
return unfetched + fetched + gone + redirPerm + redirTemp + notModified;
}
@@ -140,51 +140,51 @@
this.homepageUrl = homepageUrl;
}
- public void setUnfetched(int val) {
+ public void setUnfetched(long val) {
unfetched = val;
}
- public int getUnfetched() {
+ public long getUnfetched() {
return unfetched;
}
- public void setFetched(int val) {
+ public void setFetched(long val) {
fetched = val;
}
- public int getFetched() {
+ public long getFetched() {
return fetched;
}
- public void setNotModified(int val) {
+ public void setNotModified(long val) {
notModified = val;
}
- public int getNotModified() {
+ public long getNotModified() {
return notModified;
}
- public void setRedirTemp(int val) {
+ public void setRedirTemp(long val) {
redirTemp = val;
}
- public int getRedirTemp() {
+ public long getRedirTemp() {
return redirTemp;
}
- public void setRedirPerm(int val) {
+ public void setRedirPerm(long val) {
redirPerm = val;
}
- public int getRedirPerm() {
+ public long getRedirPerm() {
return redirPerm;
}
- public void setGone(int val) {
+ public void setGone(long val) {
gone = val;
}
- public int getGone() {
+ public long getGone() {
return gone;
}
@@ -249,15 +249,15 @@
lastCheck = new Date(in.readLong());
homepageUrl = Text.readString(in);
- dnsFailures = in.readInt();
- connectionFailures = in.readInt();
+ dnsFailures = in.readLong();
+ connectionFailures = in.readLong();
- unfetched= in.readInt();
- fetched= in.readInt();
- notModified= in.readInt();
- redirTemp= in.readInt();
- redirPerm = in.readInt();
- gone = in.readInt();
+ unfetched= in.readLong();
+ fetched= in.readLong();
+ notModified= in.readLong();
+ redirTemp= in.readLong();
+ redirPerm = in.readLong();
+ gone = in.readLong();
metaData = new org.apache.hadoop.io.MapWritable();
metaData.readFields(in);
@@ -269,15 +269,15 @@
out.writeLong(lastCheck.getTime());
Text.writeString(out, homepageUrl);
- out.writeInt(dnsFailures);
- out.writeInt(connectionFailures);
+ out.writeLong(dnsFailures);
+ out.writeLong(connectionFailures);
- out.writeInt(unfetched);
- out.writeInt(fetched);
- out.writeInt(notModified);
- out.writeInt(redirTemp);
- out.writeInt(redirPerm);
- out.writeInt(gone);
+ out.writeLong(unfetched);
+ out.writeLong(fetched);
+ out.writeLong(notModified);
+ out.writeLong(redirTemp);
+ out.writeLong(redirPerm);
+ out.writeLong(gone);
metaData.write(out);
}
@@ -285,25 +285,25 @@
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
- buf.append(Integer.toString(getUnfetched()));
+ buf.append(Long.toString(getUnfetched()));
buf.append("\t");
- buf.append(Integer.toString(getFetched()));
+ buf.append(Long.toString(getFetched()));
buf.append("\t");
- buf.append(Integer.toString(getGone()));
+ buf.append(Long.toString(getGone()));
buf.append("\t");
- buf.append(Integer.toString(getRedirTemp()));
+ buf.append(Long.toString(getRedirTemp()));
buf.append("\t");
- buf.append(Integer.toString(getRedirPerm()));
+ buf.append(Long.toString(getRedirPerm()));
buf.append("\t");
- buf.append(Integer.toString(getNotModified()));
+ buf.append(Long.toString(getNotModified()));
buf.append("\t");
- buf.append(Integer.toString(numRecords()));
+ buf.append(Long.toString(numRecords()));
buf.append("\t");
- buf.append(Integer.toString(getDnsFailures()));
+ buf.append(Long.toString(getDnsFailures()));
buf.append("\t");
- buf.append(Integer.toString(getConnectionFailures()));
+ buf.append(Long.toString(getConnectionFailures()));
buf.append("\t");
- buf.append(Integer.toString(numFailures()));
+ buf.append(Long.toString(numFailures()));
buf.append("\t");
buf.append(Float.toString(score));
buf.append("\t");
diff --git a/src/java/org/apache/nutch/hostdb/ResolverThread.java b/src/java/org/apache/nutch/hostdb/ResolverThread.java
index fe66217..564e5da 100644
--- a/src/java/org/apache/nutch/hostdb/ResolverThread.java
+++ b/src/java/org/apache/nutch/hostdb/ResolverThread.java
@@ -71,7 +71,7 @@
} else if (datum.getDnsFailures() > 0) {
context.getCounter("UpdateHostDb", "rediscovered_host").increment(1);
datum.setLastCheck();
- datum.setDnsFailures(0);
+ datum.setDnsFailures(0l);
LOG.info(host + ": rediscovered_host " + datum);
} else {
context.getCounter("UpdateHostDb", "existing_known_host").increment(1);
@@ -86,7 +86,7 @@
// If the counter is empty we'll initialize with date = today and 1 failure
if (datum.isEmpty()) {
datum.setLastCheck();
- datum.setDnsFailures(1);
+ datum.setDnsFailures(1l);
context.write(hostText, datum);
context.getCounter("UpdateHostDb", "new_unknown_host").increment(1);
LOG.info(host + ": new_unknown_host " + datum);
@@ -108,7 +108,7 @@
}
context.getCounter("UpdateHostDb",
- Integer.toString(datum.numFailures()) + "_times_failed").increment(1);
+ Long.toString(datum.numFailures()) + "_times_failed").increment(1);
} catch (Exception ioe) {
LOG.warn(StringUtils.stringifyException(ioe));
}
diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
index 70ce3eb..a0030f4d 100644
--- a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
+++ b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
@@ -28,7 +28,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
@@ -118,10 +118,10 @@
public void reduce(Text key, Iterable<NutchWritable> values,
Context context) throws IOException, InterruptedException {
- Map<String,Map<String,Integer>> stringCounts = new HashMap<>();
+ Map<String,Map<String,Long>> stringCounts = new HashMap<>();
Map<String,Float> maximums = new HashMap<>();
Map<String,Float> sums = new HashMap<>(); // used to calc averages
- Map<String,Integer> counts = new HashMap<>(); // used to calc averages
+ Map<String,Long> counts = new HashMap<>(); // used to calc averages
Map<String,Float> minimums = new HashMap<>();
Map<String,TDigest> tdigests = new HashMap<String,TDigest>();
@@ -146,27 +146,27 @@
// Set the correct status field
switch (buffer.getStatus()) {
case CrawlDatum.STATUS_DB_UNFETCHED:
- hostDatum.setUnfetched(hostDatum.getUnfetched() + 1);
+ hostDatum.setUnfetched(hostDatum.getUnfetched() + 1l);
break;
case CrawlDatum.STATUS_DB_FETCHED:
- hostDatum.setFetched(hostDatum.getFetched() + 1);
+ hostDatum.setFetched(hostDatum.getFetched() + 1l);
break;
case CrawlDatum.STATUS_DB_GONE:
- hostDatum.setGone(hostDatum.getGone() + 1);
+ hostDatum.setGone(hostDatum.getGone() + 1l);
break;
case CrawlDatum.STATUS_DB_REDIR_TEMP:
- hostDatum.setRedirTemp(hostDatum.getRedirTemp() + 1);
+ hostDatum.setRedirTemp(hostDatum.getRedirTemp() + 1l);
break;
case CrawlDatum.STATUS_DB_REDIR_PERM:
- hostDatum.setRedirPerm(hostDatum.getRedirPerm() + 1);
+ hostDatum.setRedirPerm(hostDatum.getRedirPerm() + 1l);
break;
case CrawlDatum.STATUS_DB_NOTMODIFIED:
- hostDatum.setNotModified(hostDatum.getNotModified() + 1);
+ hostDatum.setNotModified(hostDatum.getNotModified() + 1l);
break;
}
@@ -193,10 +193,10 @@
// Does the value exist?
if (stringCounts.get(stringFields[i]).containsKey(metadataValue)) {
// Yes, increment it
- stringCounts.get(stringFields[i]).put(metadataValue, stringCounts.get(stringFields[i]).get(metadataValue) + 1);
+ stringCounts.get(stringFields[i]).put(metadataValue, stringCounts.get(stringFields[i]).get(metadataValue) + 1l);
} else {
// Create it!
- stringCounts.get(stringFields[i]).put(metadataValue, 1);
+ stringCounts.get(stringFields[i]).put(metadataValue, 1l);
}
}
}
@@ -247,11 +247,11 @@
if (sums.containsKey(numericFields[i])) {
// Increment
sums.put(numericFields[i], sums.get(numericFields[i]) + metadataValue);
- counts.put(numericFields[i], counts.get(numericFields[i]) + 1);
+ counts.put(numericFields[i], counts.get(numericFields[i]) + 1l);
} else {
// Create it!
sums.put(numericFields[i], metadataValue);
- counts.put(numericFields[i], 1);
+ counts.put(numericFields[i], 1l);
}
} catch (Exception e) {
LOG.error(e.getMessage() + " when processing values for " + key.toString());
@@ -312,9 +312,9 @@
}
// Set metadata
- for (Map.Entry<String, Map<String,Integer>> entry : stringCounts.entrySet()) {
- for (Map.Entry<String,Integer> subEntry : entry.getValue().entrySet()) {
- hostDatum.getMetaData().put(new Text(entry.getKey() + "." + subEntry.getKey()), new IntWritable(subEntry.getValue()));
+ for (Map.Entry<String, Map<String,Long>> entry : stringCounts.entrySet()) {
+ for (Map.Entry<String,Long> subEntry : entry.getValue().entrySet()) {
+ hostDatum.getMetaData().put(new Text(entry.getKey() + "." + subEntry.getKey()), new LongWritable(subEntry.getValue()));
}
}
for (Map.Entry<String, Float> entry : maximums.entrySet()) {
@@ -326,7 +326,7 @@
for (Map.Entry<String, TDigest> entry : tdigests.entrySet()) {
// Emit all percentiles
for (int i = 0; i < percentiles.length; i++) {
- hostDatum.getMetaData().put(new Text("pct" + Integer.toString(percentiles[i]) + "." + entry.getKey()), new FloatWritable((float)entry.getValue().quantile(0.5)));
+ hostDatum.getMetaData().put(new Text("pct" + Long.toString(percentiles[i]) + "." + entry.getKey()), new FloatWritable((float)entry.getValue().quantile(0.5)));
}
}
for (Map.Entry<String, Float> entry : minimums.entrySet()) {