Merge pull request #477 from sebastian-nagel/NUTCH-2737-generator-log-selection
NUTCH-2737 NUTCH-2738 Generator: count and log reason of rejections during selection, document property generate.restrict.status
diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml
index 6bbf7dd..dac167d 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml
@@ -862,8 +862,9 @@
generate/fetch/update cycles may overlap, setting this to true ensures
that generate will create different fetchlists even without intervening
updatedb-s, at the cost of running an additional job to update CrawlDB.
- If false, running generate twice without intervening
- updatedb will generate identical fetchlists.</description>
+ If false, running generate twice without intervening updatedb will
+ generate identical fetchlists. See also crawl.gen.delay which defines
+ how long items already generated are blocked.</description>
</property>
<property>
@@ -904,6 +905,13 @@
See https://issues.apache.org/jira/browse/NUTCH-2368</description>
</property>
+<property>
+ <name>generate.restrict.status</name>
+ <value></value>
+ <description>Select only entries of this status, see
+ https://issues.apache.org/jira/browse/NUTCH-1248</description>
+</property>
+
<!-- urlpartitioner properties -->
<property>
@@ -920,8 +928,9 @@
<description>
This value, expressed in milliseconds, defines how long we should keep the lock on records
in CrawlDb that were just selected for fetching. If these records are not updated
- in the meantime, the lock is canceled, i.e. they become eligible for selecting.
- Default value of this is 7 days (604800000 ms).
+ in the meantime, the lock is canceled, i.e. they become eligible for selecting again.
+ Default value of this is 7 days (604800000 ms). If generate.update.crawldb is false
+ the property crawl.gen.delay has no effect.
</description>
</property>
diff --git a/src/java/org/apache/nutch/crawl/CrawlDatum.java b/src/java/org/apache/nutch/crawl/CrawlDatum.java
index 66a6fff..e05d7fd 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDatum.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDatum.java
@@ -185,6 +185,15 @@
return res;
}
+ public static byte getStatusByName(String name) {
+ for (Entry<Byte, String> status : statNames.entrySet()) {
+ if (name.equalsIgnoreCase(status.getValue())) {
+ return status.getKey();
+ }
+ }
+ return -1;
+ }
+
public void setStatus(int status) {
this.status = (byte) status;
}
diff --git a/src/java/org/apache/nutch/crawl/Generator.java b/src/java/org/apache/nutch/crawl/Generator.java
index bc6a3aa..1c5e4d5 100644
--- a/src/java/org/apache/nutch/crawl/Generator.java
+++ b/src/java/org/apache/nutch/crawl/Generator.java
@@ -20,12 +20,14 @@
import java.io.DataOutput;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.net.MalformedURLException;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Random;
@@ -35,6 +37,7 @@
import org.apache.commons.jexl2.Expression;
import org.apache.commons.jexl2.JexlContext;
import org.apache.commons.jexl2.MapContext;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
@@ -138,16 +141,16 @@
}
/** Selects entries due for fetch. */
- public static class Selector extends
- Partitioner<FloatWritable, Writable> implements Configurable {
+ public static class Selector extends Partitioner<FloatWritable, Writable>
+ implements Configurable {
private final URLPartitioner partitioner = new URLPartitioner();
/** Partition by host / domain or IP. */
public int getPartition(FloatWritable key, Writable value,
- int numReduceTasks) {
+ int numReduceTasks) {
return partitioner.getPartition(((SelectorEntry) value).url, key,
- numReduceTasks);
+ numReduceTasks);
}
@Override
@@ -161,391 +164,410 @@
}
}
- /** Select and invert subset due for fetch. */
+ /** Select and invert subset due for fetch. */
- public static class SelectorMapper extends
- Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry> {
-
- private LongWritable genTime = new LongWritable(System.currentTimeMillis());
- private long curTime;
- private Configuration conf;
- private URLFilters filters;
- private ScoringFilters scfilters;
- private SelectorEntry entry = new SelectorEntry();
- private FloatWritable sortValue = new FloatWritable();
- private boolean filter;
- private long genDelay;
- private FetchSchedule schedule;
- private float scoreThreshold = 0f;
- private int intervalThreshold = -1;
- private String restrictStatus = null;
- private Expression expr = null;
-
- @Override
- public void setup(Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry>.Context context) throws IOException{
- conf = context.getConfiguration();
- curTime = conf.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis());
- filters = new URLFilters(conf);
- scfilters = new ScoringFilters(conf);
- filter = conf.getBoolean(GENERATOR_FILTER, true);
- genDelay = conf.getLong(GENERATOR_DELAY, 604800000L); // 7 days as default.
- long time = conf.getLong(Nutch.GENERATE_TIME_KEY, 0L);
- if (time > 0)
- genTime.set(time);
- schedule = FetchScheduleFactory.getFetchSchedule(conf);
- scoreThreshold = conf.getFloat(GENERATOR_MIN_SCORE, Float.NaN);
- intervalThreshold = conf.getInt(GENERATOR_MIN_INTERVAL, -1);
- restrictStatus = conf.get(GENERATOR_RESTRICT_STATUS, null);
- expr = JexlUtil.parseExpression(conf.get(GENERATOR_EXPR, null));
+ public static class SelectorMapper
+ extends Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry> {
+
+ private LongWritable genTime = new LongWritable(System.currentTimeMillis());
+ private long curTime;
+ private Configuration conf;
+ private URLFilters filters;
+ private ScoringFilters scfilters;
+ private SelectorEntry entry = new SelectorEntry();
+ private FloatWritable sortValue = new FloatWritable();
+ private boolean filter;
+ private long genDelay;
+ private FetchSchedule schedule;
+ private float scoreThreshold = 0f;
+ private int intervalThreshold = -1;
+ private byte restrictStatus = -1;
+ private Expression expr = null;
+
+ @Override
+ public void setup(
+ Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry>.Context context)
+ throws IOException {
+ conf = context.getConfiguration();
+ curTime = conf.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis());
+ filters = new URLFilters(conf);
+ scfilters = new ScoringFilters(conf);
+ filter = conf.getBoolean(GENERATOR_FILTER, true);
+ /* CrawlDb items are unblocked after 7 days as default */
+ genDelay = conf.getLong(GENERATOR_DELAY, 604800000L);
+ long time = conf.getLong(Nutch.GENERATE_TIME_KEY, 0L);
+ if (time > 0)
+ genTime.set(time);
+ schedule = FetchScheduleFactory.getFetchSchedule(conf);
+ scoreThreshold = conf.getFloat(GENERATOR_MIN_SCORE, Float.NaN);
+ intervalThreshold = conf.getInt(GENERATOR_MIN_INTERVAL, -1);
+ String restrictStatusString = conf.getTrimmed(GENERATOR_RESTRICT_STATUS,
+ "");
+ if (!restrictStatusString.isEmpty()) {
+ restrictStatus = CrawlDatum.getStatusByName(restrictStatusString);
}
+ expr = JexlUtil.parseExpression(conf.get(GENERATOR_EXPR, null));
+ }
- @Override
- public void map(Text key, CrawlDatum value,
- Context context)
- throws IOException, InterruptedException {
- Text url = key;
- if (filter) {
- // If filtering is on don't generate URLs that don't pass
- // URLFilters
- try {
- if (filters.filter(url.toString()) == null)
- return;
- } catch (URLFilterException e) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage()
- + ")");
- }
- }
- }
- CrawlDatum crawlDatum = value;
-
- // check fetch schedule
- if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
- LOG.debug("-shouldFetch rejected '" + url + "', fetchTime="
- + crawlDatum.getFetchTime() + ", curTime=" + curTime);
- return;
- }
-
- LongWritable oldGenTime = (LongWritable) crawlDatum.getMetaData().get(
- Nutch.WRITABLE_GENERATE_TIME_KEY);
- if (oldGenTime != null) { // awaiting fetch & update
- if (oldGenTime.get() + genDelay > curTime) // still wait for
- // update
- return;
- }
- float sort = 1.0f;
+ @Override
+ public void map(Text key, CrawlDatum value, Context context)
+ throws IOException, InterruptedException {
+ Text url = key;
+ if (filter) {
+ // If filtering is on don't generate URLs that don't pass
+ // URLFilters
try {
- sort = scfilters.generatorSortValue(key, crawlDatum, sort);
- } catch (ScoringFilterException sfe) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + sfe);
- }
- }
-
- // check expr
- if (expr != null) {
- if (!crawlDatum.evaluate(expr, key.toString())) {
+ if (filters.filter(url.toString()) == null)
return;
- }
+ } catch (URLFilterException e) {
+ LOG.warn("Couldn't filter url: {} ({})", url, e.getMessage());
}
+ }
+ CrawlDatum crawlDatum = value;
- if (restrictStatus != null
- && !restrictStatus.equalsIgnoreCase(CrawlDatum
- .getStatusName(crawlDatum.getStatus())))
+ // check fetch schedule
+ if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
+ LOG.debug("-shouldFetch rejected '{}', fetchTime={}, curTime={}", url,
+ crawlDatum.getFetchTime(), curTime);
+ context.getCounter("Generator", "SCHEDULE_REJECTED").increment(1);
+ return;
+ }
+
+ LongWritable oldGenTime = (LongWritable) crawlDatum.getMetaData()
+ .get(Nutch.WRITABLE_GENERATE_TIME_KEY);
+ if (oldGenTime != null) { // awaiting fetch & update
+ if (oldGenTime.get() + genDelay > curTime) // still wait for
+ // update
+ context.getCounter("Generator", "WAIT_FOR_UPDATE").increment(1);
+ return;
+ }
+ float sort = 1.0f;
+ try {
+ sort = scfilters.generatorSortValue(key, crawlDatum, sort);
+ } catch (ScoringFilterException sfe) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn(
+ "Couldn't filter generatorSortValue for " + key + ": " + sfe);
+ }
+ }
+
+ // check expr
+ if (expr != null) {
+ if (!crawlDatum.evaluate(expr, key.toString())) {
+ context.getCounter("Generator", "EXPR_REJECTED").increment(1);
return;
+ }
+ }
- // consider only entries with a score superior to the threshold
- if (!Float.isNaN(scoreThreshold) && sort < scoreThreshold)
- return;
+ if (restrictStatus != -1 && restrictStatus != crawlDatum.getStatus()) {
+ context.getCounter("Generator", "STATUS_REJECTED").increment(1);
+ return;
+ }
- // consider only entries with a retry (or fetch) interval lower than
- // threshold
- if (intervalThreshold != -1
- && crawlDatum.getFetchInterval() > intervalThreshold)
- return;
+ // consider only entries with a score superior to the threshold
+ if (!Float.isNaN(scoreThreshold) && sort < scoreThreshold) {
+ context.getCounter("Generator", "SCORE_TOO_LOW").increment(1);
+ return;
+ }
- // sort by decreasing score, using DecreasingFloatComparator
- sortValue.set(sort);
- // record generation time
- crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
- entry.datum = crawlDatum;
- entry.url = key;
- context.write(sortValue, entry); // invert for sort by score
+ // consider only entries with a retry (or fetch) interval lower than
+ // threshold
+ if (intervalThreshold != -1
+ && crawlDatum.getFetchInterval() > intervalThreshold) {
+ context.getCounter("Generator", "INTERVAL_REJECTED").increment(1);
+ return;
+ }
+
+ // sort by decreasing score, using DecreasingFloatComparator
+ sortValue.set(sort);
+ // record generation time
+ crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
+ entry.datum = crawlDatum;
+ entry.url = key;
+ context.write(sortValue, entry); // invert for sort by score
+ }
+ }
+
+ /** Collect until limit is reached. */
+ public static class SelectorReducer extends
+ Reducer<FloatWritable, SelectorEntry, FloatWritable, SelectorEntry> {
+
+ private HashMap<String, int[]> hostCounts = new HashMap<>();
+ private long count;
+ private int currentsegmentnum = 1;
+ private MultipleOutputs<FloatWritable, SelectorEntry> mos;
+ private String outputFile;
+ private long limit;
+ private int segCounts[];
+ private int maxNumSegments = 1;
+ private int maxCount;
+ private Configuration conf;
+ private boolean byDomain = false;
+ private URLNormalizers normalizers;
+ private static boolean normalise;
+ private SequenceFile.Reader[] hostdbReaders = null;
+ private Expression maxCountExpr = null;
+ private Expression fetchDelayExpr = null;
+
+ public void open() {
+ if (conf.get(GENERATOR_HOSTDB) != null) {
+ try {
+ Path path = new Path(conf.get(GENERATOR_HOSTDB), "current");
+ hostdbReaders = SegmentReaderUtil.getReaders(path, conf);
+ } catch (IOException e) {
+ LOG.error("Error reading HostDB because {}", e.getMessage());
+ }
}
}
+ public void close() {
+ if (hostdbReaders != null) {
+ try {
+ for (int i = 0; i < hostdbReaders.length; i++) {
+ hostdbReaders[i].close();
+ }
+ } catch (IOException e) {
+ LOG.error("Error closing HostDB because {}", e.getMessage());
+ }
+ }
+ }
-
- /** Collect until limit is reached. */
- public static class SelectorReducer extends
- Reducer<FloatWritable, SelectorEntry, FloatWritable, SelectorEntry> {
-
- private HashMap<String, int[]> hostCounts = new HashMap<>();
- private long count;
- private int currentsegmentnum = 1;
- private MultipleOutputs<FloatWritable, SelectorEntry> mos;
- private String outputFile;
- private long limit;
- private int segCounts[];
- private int maxNumSegments = 1;
- private int maxCount;
- private Configuration conf;
- private boolean byDomain = false;
- private URLNormalizers normalizers;
- private static boolean normalise;
- private SequenceFile.Reader[] hostdbReaders = null;
- private Expression maxCountExpr = null;
- private Expression fetchDelayExpr = null;
+ private JexlContext createContext(HostDatum datum) {
+ JexlContext context = new MapContext();
+ context.set("dnsFailures", datum.getDnsFailures());
+ context.set("connectionFailures", datum.getConnectionFailures());
+ context.set("unfetched", datum.getUnfetched());
+ context.set("fetched", datum.getFetched());
+ context.set("notModified", datum.getNotModified());
+ context.set("redirTemp", datum.getRedirTemp());
+ context.set("redirPerm", datum.getRedirPerm());
+ context.set("gone", datum.getGone());
+ context.set("conf", conf);
- public void open() {
- if (conf.get(GENERATOR_HOSTDB) != null) {
+ // Set metadata variables
+ for (Map.Entry<Writable, Writable> entry : datum.getMetaData()
+ .entrySet()) {
+ Object value = entry.getValue();
+
+ if (value instanceof FloatWritable) {
+ FloatWritable fvalue = (FloatWritable) value;
+ Text tkey = (Text) entry.getKey();
+ context.set(tkey.toString(), fvalue.get());
+ }
+
+ if (value instanceof IntWritable) {
+ IntWritable ivalue = (IntWritable) value;
+ Text tkey = (Text) entry.getKey();
+ context.set(tkey.toString(), ivalue.get());
+ }
+
+ if (value instanceof Text) {
+ Text tvalue = (Text) value;
+ Text tkey = (Text) entry.getKey();
+ context.set(tkey.toString().replace("-", "_"), tvalue.toString());
+ }
+ }
+
+ return context;
+ }
+
+ @Override
+ public void setup(Context context) throws IOException {
+ conf = context.getConfiguration();
+ mos = new MultipleOutputs<FloatWritable, SelectorEntry>(context);
+ Job job = Job.getInstance(conf);
+ limit = conf.getLong(GENERATOR_TOP_N, Long.MAX_VALUE)
+ / job.getNumReduceTasks();
+ maxNumSegments = conf.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1);
+ segCounts = new int[maxNumSegments];
+ maxCount = conf.getInt(GENERATOR_MAX_COUNT, -1);
+ if (maxCount == -1) {
+ byDomain = false;
+ }
+ if (GENERATOR_COUNT_VALUE_DOMAIN.equals(conf.get(GENERATOR_COUNT_MODE)))
+ byDomain = true;
+ normalise = conf.getBoolean(GENERATOR_NORMALISE, true);
+ if (normalise)
+ normalizers = new URLNormalizers(conf,
+ URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+
+ if (conf.get(GENERATOR_HOSTDB) != null) {
+ maxCountExpr = JexlUtil
+ .parseExpression(conf.get(GENERATOR_MAX_COUNT_EXPR, null));
+ fetchDelayExpr = JexlUtil
+ .parseExpression(conf.get(GENERATOR_FETCH_DELAY_EXPR, null));
+ }
+ }
+
+ @Override
+ public void cleanup(Context context)
+ throws IOException, InterruptedException {
+ mos.close();
+ }
+
+ @Override
+ public void reduce(FloatWritable key, Iterable<SelectorEntry> values,
+ Context context) throws IOException, InterruptedException {
+
+ String hostname = null;
+ HostDatum host = null;
+ LongWritable variableFetchDelayWritable = null; // in millis
+ Text variableFetchDelayKey = new Text("_variableFetchDelay_");
+ // local variable maxCount may hold host-specific count set in HostDb
+ int maxCount = this.maxCount;
+ for (SelectorEntry entry : values) {
+ Text url = entry.url;
+ String urlString = url.toString();
+ URL u = null;
+
+ // Do this only once per queue
+ if (host == null) {
try {
- Path path = new Path(conf.get(GENERATOR_HOSTDB), "current");
- hostdbReaders = SegmentReaderUtil.getReaders(path, conf);
- } catch (IOException e) {
- LOG.error("Error reading HostDB because {}", e.getMessage());
- }
- }
- }
-
- public void close() {
- if (hostdbReaders != null) {
- try {
- for (int i = 0; i < hostdbReaders.length; i++) {
- hostdbReaders[i].close();
- }
- } catch (IOException e) {
- LOG.error("Error closing HostDB because {}", e.getMessage());
- }
- }
- }
-
- private JexlContext createContext(HostDatum datum) {
- JexlContext context = new MapContext();
- context.set("dnsFailures", datum.getDnsFailures());
- context.set("connectionFailures", datum.getConnectionFailures());
- context.set("unfetched", datum.getUnfetched());
- context.set("fetched", datum.getFetched());
- context.set("notModified", datum.getNotModified());
- context.set("redirTemp", datum.getRedirTemp());
- context.set("redirPerm", datum.getRedirPerm());
- context.set("gone", datum.getGone());
- context.set("conf", conf);
-
- // Set metadata variables
- for (Map.Entry<Writable, Writable> entry : datum.getMetaData().entrySet()) {
- Object value = entry.getValue();
-
- if (value instanceof FloatWritable) {
- FloatWritable fvalue = (FloatWritable)value;
- Text tkey = (Text)entry.getKey();
- context.set(tkey.toString(), fvalue.get());
- }
-
- if (value instanceof IntWritable) {
- IntWritable ivalue = (IntWritable)value;
- Text tkey = (Text)entry.getKey();
- context.set(tkey.toString(), ivalue.get());
- }
-
- if (value instanceof Text) {
- Text tvalue = (Text)value;
- Text tkey = (Text)entry.getKey();
- context.set(tkey.toString().replace("-", "_"), tvalue.toString());
- }
- }
-
- return context;
- }
-
- @Override
- public void setup(Context context) throws IOException {
- conf = context.getConfiguration();
- mos = new MultipleOutputs<FloatWritable, SelectorEntry>(context);
- Job job = Job.getInstance(conf);
- limit = conf.getLong(GENERATOR_TOP_N, Long.MAX_VALUE)
- / job.getNumReduceTasks();
- maxNumSegments = conf.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1);
- segCounts = new int[maxNumSegments];
- maxCount = conf.getInt(GENERATOR_MAX_COUNT, -1);
- if (maxCount == -1) {
- byDomain = false;
- }
- if (GENERATOR_COUNT_VALUE_DOMAIN.equals(conf.get(GENERATOR_COUNT_MODE)))
- byDomain = true;
- normalise = conf.getBoolean(GENERATOR_NORMALISE, true);
- if (normalise)
- normalizers = new URLNormalizers(conf,
- URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
-
- if (conf.get(GENERATOR_HOSTDB) != null) {
- maxCountExpr = JexlUtil.parseExpression(conf.get(GENERATOR_MAX_COUNT_EXPR, null));
- fetchDelayExpr = JexlUtil.parseExpression(conf.get(GENERATOR_FETCH_DELAY_EXPR, null));
- }
- }
-
- @Override
- public void cleanup(Context context) throws IOException, InterruptedException {
- mos.close();
- }
-
- @Override
- public void reduce(FloatWritable key, Iterable<SelectorEntry> values,
- Context context)
- throws IOException, InterruptedException {
-
- String hostname = null;
- HostDatum host = null;
- LongWritable variableFetchDelayWritable = null; // in millis
- Text variableFetchDelayKey = new Text("_variableFetchDelay_");
- // local variable maxCount may hold host-specific count set in HostDb
- int maxCount = this.maxCount;
- for (SelectorEntry entry : values) {
- Text url = entry.url;
- String urlString = url.toString();
- URL u = null;
-
- // Do this only once per queue
- if (host == null) {
- try {
- hostname = URLUtil.getHost(urlString);
- host = getHostDatum(hostname);
- } catch (Exception e) {}
-
- // Got it?
- if (host == null) {
- // Didn't work, prevent future lookups
- host = new HostDatum();
- } else {
- if (maxCountExpr != null) {
- long variableMaxCount = Math.round((double)maxCountExpr.evaluate(createContext(host)));
- LOG.info("Generator: variable maxCount: {} for {}", variableMaxCount, hostname);
- maxCount = (int)variableMaxCount;
- }
-
- if (fetchDelayExpr != null) {
- long variableFetchDelay = Math.round((double)fetchDelayExpr.evaluate(createContext(host)));
- LOG.info("Generator: variable fetchDelay: {} ms for {}", variableFetchDelay, hostname);
- variableFetchDelayWritable = new LongWritable(variableFetchDelay);
- }
- }
- }
-
- // Got a non-zero variable fetch delay? Add it to the datum's metadata
- if (variableFetchDelayWritable != null) {
- entry.datum.getMetaData().put(variableFetchDelayKey, variableFetchDelayWritable);
- }
-
- if (count == limit) {
- // do we have any segments left?
- if (currentsegmentnum < maxNumSegments) {
- count = 0;
- currentsegmentnum++;
- } else
- break;
- }
-
- String hostordomain = null;
-
- try {
- if (normalise && normalizers != null) {
- urlString = normalizers.normalize(urlString,
- URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
- }
- u = new URL(urlString);
- if (byDomain) {
- hostordomain = URLUtil.getDomainName(u);
- } else {
- hostordomain = new URL(urlString).getHost();
- }
+ hostname = URLUtil.getHost(urlString);
+ host = getHostDatum(hostname);
} catch (Exception e) {
- LOG.warn("Malformed URL: '" + urlString + "', skipping ("
- + StringUtils.stringifyException(e) + ")");
- context.getCounter("Generator", "MALFORMED_URL").increment(1);
- continue;
}
- hostordomain = hostordomain.toLowerCase();
-
- // only filter if we are counting hosts or domains
- if (maxCount > 0) {
- int[] hostCount = hostCounts.get(hostordomain);
- if (hostCount == null) {
- hostCount = new int[] { 1, 0 };
- hostCounts.put(hostordomain, hostCount);
- }
-
- // increment hostCount
- hostCount[1]++;
-
- // check if topN reached, select next segment if it is
- while (segCounts[hostCount[0] - 1] >= limit
- && hostCount[0] < maxNumSegments) {
- hostCount[0]++;
- hostCount[1] = 0;
- }
-
- // reached the limit of allowed URLs per host / domain
- // see if we can put it in the next segment?
- if (hostCount[1] > maxCount) {
- if (hostCount[0] < maxNumSegments) {
- hostCount[0]++;
- hostCount[1] = 1;
- } else {
- if (hostCount[1] == maxCount && LOG.isInfoEnabled()) {
- LOG.info("Host or domain "
- + hostordomain
- + " has more than "
- + maxCount
- + " URLs for all "
- + maxNumSegments
- + " segments. Additional URLs won't be included in the fetchlist.");
- }
- // skip this entry
- continue;
- }
- }
- entry.segnum = new IntWritable(hostCount[0]);
- segCounts[hostCount[0] - 1]++;
+ // Got it?
+ if (host == null) {
+ // Didn't work, prevent future lookups
+ host = new HostDatum();
} else {
- entry.segnum = new IntWritable(currentsegmentnum);
- segCounts[currentsegmentnum - 1]++;
- }
+ if (maxCountExpr != null) {
+ long variableMaxCount = Math
+ .round((double) maxCountExpr.evaluate(createContext(host)));
+ LOG.info("Generator: variable maxCount: {} for {}",
+ variableMaxCount, hostname);
+ maxCount = (int) variableMaxCount;
+ }
- outputFile = generateFileName(entry);
- mos.write("sequenceFiles", key, entry, outputFile);
-
- // Count is incremented only when we keep the URL
- // maxCount may cause us to skip it.
- count++;
- }
- }
-
- private String generateFileName(SelectorEntry entry) {
- return "fetchlist-" + entry.segnum.toString() + "/part";
- }
-
- private HostDatum getHostDatum(String host) throws Exception {
- Text key = new Text();
- HostDatum value = new HostDatum();
-
- open();
- for (int i = 0; i < hostdbReaders.length; i++) {
- while (hostdbReaders[i].next(key, value)) {
- if (host.equals(key.toString())) {
- close();
- return value;
+ if (fetchDelayExpr != null) {
+ long variableFetchDelay = Math
+ .round((double) fetchDelayExpr.evaluate(createContext(host)));
+ LOG.info("Generator: variable fetchDelay: {} ms for {}",
+ variableFetchDelay, hostname);
+ variableFetchDelayWritable = new LongWritable(variableFetchDelay);
}
}
}
- close();
- return null;
+ // Got a non-zero variable fetch delay? Add it to the datum's metadata
+ if (variableFetchDelayWritable != null) {
+ entry.datum.getMetaData().put(variableFetchDelayKey,
+ variableFetchDelayWritable);
+ }
+
+ if (count == limit) {
+ // do we have any segments left?
+ if (currentsegmentnum < maxNumSegments) {
+ count = 0;
+ currentsegmentnum++;
+ } else
+ break;
+ }
+
+ String hostordomain = null;
+
+ try {
+ if (normalise && normalizers != null) {
+ urlString = normalizers.normalize(urlString,
+ URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+ }
+ u = new URL(urlString);
+ if (byDomain) {
+ hostordomain = URLUtil.getDomainName(u);
+ } else {
+ hostordomain = u.getHost();
+ }
+ } catch (MalformedURLException e) {
+ LOG.warn("Malformed URL: '{}', skipping ({})", urlString,
+ StringUtils.stringifyException(e));
+ context.getCounter("Generator", "MALFORMED_URL").increment(1);
+ continue;
+ }
+
+ hostordomain = hostordomain.toLowerCase();
+
+ // only filter if we are counting hosts or domains
+ if (maxCount > 0) {
+ int[] hostCount = hostCounts.get(hostordomain);
+ if (hostCount == null) {
+ hostCount = new int[] { 1, 0 };
+ hostCounts.put(hostordomain, hostCount);
+ }
+
+ // increment hostCount
+ hostCount[1]++;
+
+ // check if topN reached, select next segment if it is
+ while (segCounts[hostCount[0] - 1] >= limit
+ && hostCount[0] < maxNumSegments) {
+ hostCount[0]++;
+ hostCount[1] = 0;
+ }
+
+ // reached the limit of allowed URLs per host / domain
+ // see if we can put it in the next segment?
+ if (hostCount[1] > maxCount) {
+ if (hostCount[0] < maxNumSegments) {
+ hostCount[0]++;
+ hostCount[1] = 1;
+ } else {
+ if (hostCount[1] == (maxCount+1)) {
+ context
+ .getCounter("Generator", "HOSTS_AFFECTED_PER_HOST_OVERFLOW")
+ .increment(1);
+ LOG.info(
+ "Host or domain {} has more than {} URLs for all {} segments. Additional URLs won't be included in the fetchlist.",
+ hostordomain, maxCount, maxNumSegments);
+ }
+ // skip this entry
+ context.getCounter("Generator", "URLS_SKIPPED_PER_HOST_OVERFLOW")
+ .increment(1);
+ continue;
+ }
+ }
+ entry.segnum = new IntWritable(hostCount[0]);
+ segCounts[hostCount[0] - 1]++;
+ } else {
+ entry.segnum = new IntWritable(currentsegmentnum);
+ segCounts[currentsegmentnum - 1]++;
+ }
+
+ outputFile = generateFileName(entry);
+ mos.write("sequenceFiles", key, entry, outputFile);
+
+ // Count is incremented only when we keep the URL
+ // maxCount may cause us to skip it.
+ count++;
}
}
- public static class DecreasingFloatComparator extends
- FloatWritable.Comparator {
+ private String generateFileName(SelectorEntry entry) {
+ return "fetchlist-" + entry.segnum.toString() + "/part";
+ }
+
+ private HostDatum getHostDatum(String host) throws Exception {
+ Text key = new Text();
+ HostDatum value = new HostDatum();
+
+ open();
+ for (int i = 0; i < hostdbReaders.length; i++) {
+ while (hostdbReaders[i].next(key, value)) {
+ if (host.equals(key.toString())) {
+ close();
+ return value;
+ }
+ }
+ }
+
+ close();
+ return null;
+ }
+ }
+
+ public static class DecreasingFloatComparator
+ extends FloatWritable.Comparator {
/** Compares two FloatWritables decreasing. */
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
@@ -553,23 +575,21 @@
}
}
- public static class SelectorInverseMapper extends
- Mapper<FloatWritable, SelectorEntry, Text, SelectorEntry> {
+ public static class SelectorInverseMapper
+ extends Mapper<FloatWritable, SelectorEntry, Text, SelectorEntry> {
- public void map(FloatWritable key, SelectorEntry value,
- Context context)
+ public void map(FloatWritable key, SelectorEntry value, Context context)
throws IOException, InterruptedException {
SelectorEntry entry = value;
context.write(entry.url, entry);
}
}
- public static class PartitionReducer extends
- Reducer<Text, SelectorEntry, Text, CrawlDatum> {
+ public static class PartitionReducer
+ extends Reducer<Text, SelectorEntry, Text, CrawlDatum> {
public void reduce(Text key, Iterable<SelectorEntry> values,
- Context context)
- throws IOException, InterruptedException {
+ Context context) throws IOException, InterruptedException {
// if using HashComparator, we get only one input key in case of
// hash collision so use only URLs from values
for (SelectorEntry entry : values) {
@@ -615,38 +635,37 @@
*/
public static class CrawlDbUpdater {
- public static class CrawlDbUpdateMapper extends
- Mapper<Text, CrawlDatum, Text, CrawlDatum> {
+ public static class CrawlDbUpdateMapper
+ extends Mapper<Text, CrawlDatum, Text, CrawlDatum> {
@Override
- public void map(Text key, CrawlDatum value,
- Context context)
- throws IOException, InterruptedException {
+ public void map(Text key, CrawlDatum value, Context context)
+ throws IOException, InterruptedException {
context.write(key, value);
}
}
- public static class CrawlDbUpdateReducer extends
- Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+ public static class CrawlDbUpdateReducer
+ extends Reducer<Text, CrawlDatum, Text, CrawlDatum> {
private CrawlDatum orig = new CrawlDatum();
private LongWritable genTime = new LongWritable(0L);
private long generateTime;
@Override
- public void setup(Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
+ public void setup(
+ Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
Configuration conf = context.getConfiguration();
generateTime = conf.getLong(Nutch.GENERATE_TIME_KEY, 0L);
}
@Override
- public void reduce(Text key, Iterable<CrawlDatum> values,
- Context context)
- throws IOException, InterruptedException {
+ public void reduce(Text key, Iterable<CrawlDatum> values, Context context)
+ throws IOException, InterruptedException {
genTime.set(0L);
for (CrawlDatum val : values) {
if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {
- LongWritable gt = (LongWritable) val.getMetaData().get(
- Nutch.WRITABLE_GENERATE_TIME_KEY);
+ LongWritable gt = (LongWritable) val.getMetaData()
+ .get(Nutch.WRITABLE_GENERATE_TIME_KEY);
genTime.set(gt.get());
if (genTime.get() != generateTime) {
orig.set(val);
@@ -673,14 +692,15 @@
}
public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
- long curTime) throws IOException, InterruptedException, ClassNotFoundException {
+ long curTime)
+ throws IOException, InterruptedException, ClassNotFoundException {
Job job = NutchJob.getInstance(getConf());
Configuration conf = job.getConfiguration();
boolean filter = conf.getBoolean(GENERATOR_FILTER, true);
boolean normalise = conf.getBoolean(GENERATOR_NORMALISE, true);
- return generate(dbDir, segments, numLists, topN, curTime, filter,
- normalise, false, 1, null);
+ return generate(dbDir, segments, numLists, topN, curTime, filter, normalise,
+ false, 1, null);
}
/**
@@ -688,14 +708,16 @@
* normalise and set the number of segments to 1
**/
public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
- long curTime, boolean filter, boolean force) throws IOException, InterruptedException, ClassNotFoundException {
+ long curTime, boolean filter, boolean force)
+ throws IOException, InterruptedException, ClassNotFoundException {
return generate(dbDir, segments, numLists, topN, curTime, filter, true,
force, 1, null);
}
-
+
public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
long curTime, boolean filter, boolean norm, boolean force,
- int maxNumSegments, String expr) throws IOException, InterruptedException, ClassNotFoundException {
+ int maxNumSegments, String expr)
+ throws IOException, InterruptedException, ClassNotFoundException {
return generate(dbDir, segments, numLists, topN, curTime, filter, true,
force, 1, expr, null);
}
@@ -724,14 +746,15 @@
*/
public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
long curTime, boolean filter, boolean norm, boolean force,
- int maxNumSegments, String expr, String hostdb) throws IOException, InterruptedException, ClassNotFoundException {
+ int maxNumSegments, String expr, String hostdb)
+ throws IOException, InterruptedException, ClassNotFoundException {
Path tempDir = new Path(getConf().get("mapreduce.cluster.temp.dir", ".")
+ "/generate-temp-" + java.util.UUID.randomUUID().toString());
FileSystem fs = tempDir.getFileSystem(getConf());
Path lock = CrawlDb.lock(getConf(), dbDir, force);
-
+
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("Generator: starting at " + sdf.format(start));
@@ -744,20 +767,22 @@
if (expr != null) {
LOG.info("Generator: expr: {}", expr);
}
- if (expr != null) {
+ if (hostdb != null) {
LOG.info("Generator: hostdb: {}", hostdb);
}
-
+
// map to inverted subset due for fetch, sort by score
Job job = NutchJob.getInstance(getConf());
job.setJobName("generate: select from " + dbDir);
Configuration conf = job.getConfiguration();
- if (numLists == -1) { // for politeness make
- numLists = Integer.parseInt(conf.get("mapreduce.job.maps")); // a partition per fetch task
+ if (numLists == -1) {
+ /* for politeness create exactly one partition per fetch task */
+ numLists = Integer.parseInt(conf.get("mapreduce.job.maps"));
}
if ("local".equals(conf.get("mapreduce.framework.name")) && numLists != 1) {
// override
- LOG.info("Generator: running in local mode, generating exactly one partition.");
+ LOG.info(
+ "Generator: running in local mode, generating exactly one partition.");
numLists = 1;
}
conf.setLong(GENERATOR_CUR_TIME, curTime);
@@ -786,7 +811,9 @@
job.setOutputKeyClass(FloatWritable.class);
job.setSortComparatorClass(DecreasingFloatComparator.class);
job.setOutputValueClass(SelectorEntry.class);
- MultipleOutputs.addNamedOutput(job, "sequenceFiles", SequenceFileOutputFormat.class, FloatWritable.class, SelectorEntry.class);
+ MultipleOutputs.addNamedOutput(job, "sequenceFiles",
+ SequenceFileOutputFormat.class, FloatWritable.class,
+ SelectorEntry.class);
try {
boolean success = job.waitForCompletion(true);
@@ -804,6 +831,13 @@
throw e;
}
+ LOG.info("Generator: number of items rejected during selection:");
+ for (Counter counter : job.getCounters().getGroup("Generator")) {
+ LOG.info("Generator: {} {}",
+ String.format(Locale.ROOT, "%6d", counter.getValue()),
+ counter.getName());
+ }
+
// read the subdirectories generated in the temp
// output and turn them into segments
List<Path> generatedSegments = new ArrayList<>();
@@ -820,7 +854,7 @@
}
} catch (Exception e) {
LOG.warn("Generator: exception while partitioning segments, exiting ...");
- LockUtil.removeLockFile(getConf(),lock);
+ LockUtil.removeLockFile(getConf(), lock);
fs.delete(tempDir, true);
return null;
}
@@ -928,7 +962,7 @@
throw new RuntimeException(message);
}
} catch (IOException | InterruptedException | ClassNotFoundException e) {
- LOG.error(StringUtils.stringifyException(e));
+ LOG.error(StringUtils.stringifyException(e));
throw e;
}
@@ -950,15 +984,15 @@
* Generate a fetchlist from the crawldb.
*/
public static void main(String args[]) throws Exception {
- int res = ToolRunner
- .run(NutchConfiguration.create(), new Generator(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new Generator(),
+ args);
System.exit(res);
}
public int run(String[] args) throws Exception {
if (args.length < 2) {
- System.out
- .println("Usage: Generator <crawldb> <segments_dir> [-force] [-topN N] [-numFetchers numFetchers] [-expr <expr>] [-adddays <numDays>] [-noFilter] [-noNorm] [-maxNumSegments <num>]");
+ System.out.println(
+ "Usage: Generator <crawldb> <segments_dir> [-force] [-topN N] [-numFetchers numFetchers] [-expr <expr>] [-adddays <numDays>] [-noFilter] [-noNorm] [-maxNumSegments <num>]");
return -1;
}
@@ -1014,7 +1048,8 @@
}
@Override
- public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception {
+ public Map<String, Object> run(Map<String, Object> args, String crawlId)
+ throws Exception {
Map<String, Object> results = new HashMap<>();
@@ -1028,67 +1063,63 @@
String expr = null;
String hostdb = null;
Path crawlDb;
-
- if(args.containsKey(Nutch.ARG_CRAWLDB)) {
+
+ if (args.containsKey(Nutch.ARG_CRAWLDB)) {
Object crawldbPath = args.get(Nutch.ARG_CRAWLDB);
- if(crawldbPath instanceof Path) {
+ if (crawldbPath instanceof Path) {
crawlDb = (Path) crawldbPath;
- }
- else {
+ } else {
crawlDb = new Path(crawldbPath.toString());
}
- }
- else {
- crawlDb = new Path(crawlId+"/crawldb");
+ } else {
+ crawlDb = new Path(crawlId + "/crawldb");
}
Path segmentsDir;
- if(args.containsKey(Nutch.ARG_SEGMENTDIR)) {
+ if (args.containsKey(Nutch.ARG_SEGMENTDIR)) {
Object segDir = args.get(Nutch.ARG_SEGMENTDIR);
- if(segDir instanceof Path) {
+ if (segDir instanceof Path) {
segmentsDir = (Path) segDir;
- }
- else {
+ } else {
segmentsDir = new Path(segDir.toString());
}
- }
- else {
- segmentsDir = new Path(crawlId+"/segments");
+ } else {
+ segmentsDir = new Path(crawlId + "/segments");
}
if (args.containsKey(Nutch.ARG_HOSTDB)) {
- hostdb = (String)args.get(Nutch.ARG_HOSTDB);
+ hostdb = (String) args.get(Nutch.ARG_HOSTDB);
}
-
+
if (args.containsKey("expr")) {
- expr = (String)args.get("expr");
+ expr = (String) args.get("expr");
}
if (args.containsKey("topN")) {
- topN = Long.parseLong((String)args.get("topN"));
+ topN = Long.parseLong((String) args.get("topN"));
}
if (args.containsKey("numFetchers")) {
- numFetchers = Integer.parseInt((String)args.get("numFetchers"));
+ numFetchers = Integer.parseInt((String) args.get("numFetchers"));
}
if (args.containsKey("adddays")) {
- long numDays = Integer.parseInt((String)args.get("adddays"));
+ long numDays = Integer.parseInt((String) args.get("adddays"));
curTime += numDays * 1000L * 60 * 60 * 24;
}
if (args.containsKey("noFilter")) {
filter = false;
- }
+ }
if (args.containsKey("noNorm")) {
norm = false;
- }
+ }
if (args.containsKey("force")) {
force = true;
- }
+ }
if (args.containsKey("maxNumSegments")) {
- maxNumSegments = Integer.parseInt((String)args.get("maxNumSegments"));
+ maxNumSegments = Integer.parseInt((String) args.get("maxNumSegments"));
}
try {
Path[] segs = generate(crawlDb, segmentsDir, numFetchers, topN, curTime,
filter, norm, force, maxNumSegments, expr, hostdb);
- if (segs == null){
+ if (segs == null) {
results.put(Nutch.VAL_RESULT, Integer.toString(1));
return results;
}