Generator: apply formatting
diff --git a/src/java/org/apache/nutch/crawl/Generator.java b/src/java/org/apache/nutch/crawl/Generator.java
index 7b5397c..1db3338 100644
--- a/src/java/org/apache/nutch/crawl/Generator.java
+++ b/src/java/org/apache/nutch/crawl/Generator.java
@@ -141,16 +141,16 @@
}
/** Selects entries due for fetch. */
- public static class Selector extends
- Partitioner<FloatWritable, Writable> implements Configurable {
+ public static class Selector extends Partitioner<FloatWritable, Writable>
+ implements Configurable {
private final URLPartitioner partitioner = new URLPartitioner();
/** Partition by host / domain or IP. */
public int getPartition(FloatWritable key, Writable value,
- int numReduceTasks) {
+ int numReduceTasks) {
return partitioner.getPartition(((SelectorEntry) value).url, key,
- numReduceTasks);
+ numReduceTasks);
}
@Override
@@ -164,399 +164,410 @@
}
}
- /** Select and invert subset due for fetch. */
+ /** Select and invert subset due for fetch. */
- public static class SelectorMapper extends
- Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry> {
-
- private LongWritable genTime = new LongWritable(System.currentTimeMillis());
- private long curTime;
- private Configuration conf;
- private URLFilters filters;
- private ScoringFilters scfilters;
- private SelectorEntry entry = new SelectorEntry();
- private FloatWritable sortValue = new FloatWritable();
- private boolean filter;
- private long genDelay;
- private FetchSchedule schedule;
- private float scoreThreshold = 0f;
- private int intervalThreshold = -1;
- private byte restrictStatus = -1;
- private Expression expr = null;
-
- @Override
- public void setup(Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry>.Context context) throws IOException{
- conf = context.getConfiguration();
- curTime = conf.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis());
- filters = new URLFilters(conf);
- scfilters = new ScoringFilters(conf);
- filter = conf.getBoolean(GENERATOR_FILTER, true);
- genDelay = conf.getLong(GENERATOR_DELAY, 604800000L); // 7 days as default.
- long time = conf.getLong(Nutch.GENERATE_TIME_KEY, 0L);
- if (time > 0)
- genTime.set(time);
- schedule = FetchScheduleFactory.getFetchSchedule(conf);
- scoreThreshold = conf.getFloat(GENERATOR_MIN_SCORE, Float.NaN);
- intervalThreshold = conf.getInt(GENERATOR_MIN_INTERVAL, -1);
- String restrictStatusString = conf.getTrimmed(GENERATOR_RESTRICT_STATUS, "");
- if (!restrictStatusString.isEmpty()) {
- restrictStatus = CrawlDatum.getStatusByName(restrictStatusString);
+ public static class SelectorMapper
+ extends Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry> {
+
+ private LongWritable genTime = new LongWritable(System.currentTimeMillis());
+ private long curTime;
+ private Configuration conf;
+ private URLFilters filters;
+ private ScoringFilters scfilters;
+ private SelectorEntry entry = new SelectorEntry();
+ private FloatWritable sortValue = new FloatWritable();
+ private boolean filter;
+ private long genDelay;
+ private FetchSchedule schedule;
+ private float scoreThreshold = 0f;
+ private int intervalThreshold = -1;
+ private byte restrictStatus = -1;
+ private Expression expr = null;
+
+ @Override
+ public void setup(
+ Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry>.Context context)
+ throws IOException {
+ conf = context.getConfiguration();
+ curTime = conf.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis());
+ filters = new URLFilters(conf);
+ scfilters = new ScoringFilters(conf);
+ filter = conf.getBoolean(GENERATOR_FILTER, true);
+ genDelay = conf.getLong(GENERATOR_DELAY, 604800000L); // 7 days as
+ // default.
+ long time = conf.getLong(Nutch.GENERATE_TIME_KEY, 0L);
+ if (time > 0)
+ genTime.set(time);
+ schedule = FetchScheduleFactory.getFetchSchedule(conf);
+ scoreThreshold = conf.getFloat(GENERATOR_MIN_SCORE, Float.NaN);
+ intervalThreshold = conf.getInt(GENERATOR_MIN_INTERVAL, -1);
+ String restrictStatusString = conf.getTrimmed(GENERATOR_RESTRICT_STATUS,
+ "");
+ if (!restrictStatusString.isEmpty()) {
+ restrictStatus = CrawlDatum.getStatusByName(restrictStatusString);
+ }
+ expr = JexlUtil.parseExpression(conf.get(GENERATOR_EXPR, null));
+ }
+
+ @Override
+ public void map(Text key, CrawlDatum value, Context context)
+ throws IOException, InterruptedException {
+ Text url = key;
+ if (filter) {
+ // If filtering is on don't generate URLs that don't pass
+ // URLFilters
+ try {
+ if (filters.filter(url.toString()) == null)
+ return;
+ } catch (URLFilterException e) {
+ LOG.warn("Couldn't filter url: {} ({})", url, e.getMessage());
}
- expr = JexlUtil.parseExpression(conf.get(GENERATOR_EXPR, null));
+ }
+ CrawlDatum crawlDatum = value;
+
+ // check fetch schedule
+ if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
+ LOG.debug("-shouldFetch rejected '{}', fetchTime={}, curTime={}", url,
+ crawlDatum.getFetchTime(), curTime);
+ context.getCounter("Generator", "SCHEDULE_REJECTED").increment(1);
+ return;
}
- @Override
- public void map(Text key, CrawlDatum value,
- Context context)
- throws IOException, InterruptedException {
- Text url = key;
- if (filter) {
- // If filtering is on don't generate URLs that don't pass
- // URLFilters
- try {
- if (filters.filter(url.toString()) == null)
- return;
- } catch (URLFilterException e) {
- LOG.warn("Couldn't filter url: {} ({})", url, e.getMessage());
- }
+ LongWritable oldGenTime = (LongWritable) crawlDatum.getMetaData()
+ .get(Nutch.WRITABLE_GENERATE_TIME_KEY);
+ if (oldGenTime != null) { // awaiting fetch & update
+ if (oldGenTime.get() + genDelay > curTime) // still wait for
+ // update
+ context.getCounter("Generator", "WAIT_FOR_UPDATE").increment(1);
+ return;
+ }
+ float sort = 1.0f;
+ try {
+ sort = scfilters.generatorSortValue(key, crawlDatum, sort);
+ } catch (ScoringFilterException sfe) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn(
+ "Couldn't filter generatorSortValue for " + key + ": " + sfe);
}
- CrawlDatum crawlDatum = value;
+ }
- // check fetch schedule
- if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
- LOG.debug("-shouldFetch rejected '{}', fetchTime={}, curTime={}", url,
- crawlDatum.getFetchTime(), curTime);
- context.getCounter("Generator", "SCHEDULE_REJECTED").increment(1);
+ // check expr
+ if (expr != null) {
+ if (!crawlDatum.evaluate(expr, key.toString())) {
+ context.getCounter("Generator", "EXPR_REJECTED").increment(1);
return;
}
+ }
- LongWritable oldGenTime = (LongWritable) crawlDatum.getMetaData().get(
- Nutch.WRITABLE_GENERATE_TIME_KEY);
- if (oldGenTime != null) { // awaiting fetch & update
- if (oldGenTime.get() + genDelay > curTime) // still wait for
- // update
- context.getCounter("Generator", "WAIT_FOR_UPDATE").increment(1);
- return;
- }
- float sort = 1.0f;
+ if (restrictStatus != -1 && restrictStatus != crawlDatum.getStatus()) {
+ context.getCounter("Generator", "STATUS_REJECTED").increment(1);
+ return;
+ }
+
+ // consider only entries with a score superior to the threshold
+ if (!Float.isNaN(scoreThreshold) && sort < scoreThreshold) {
+ context.getCounter("Generator", "SCORE_TOO_LOW").increment(1);
+ return;
+ }
+
+ // consider only entries with a retry (or fetch) interval lower than
+ // threshold
+ if (intervalThreshold != -1
+ && crawlDatum.getFetchInterval() > intervalThreshold) {
+ context.getCounter("Generator", "INTERVAL_REJECTED").increment(1);
+ return;
+ }
+
+ // sort by decreasing score, using DecreasingFloatComparator
+ sortValue.set(sort);
+ // record generation time
+ crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
+ entry.datum = crawlDatum;
+ entry.url = key;
+ context.write(sortValue, entry); // invert for sort by score
+ }
+ }
+
+ /** Collect until limit is reached. */
+ public static class SelectorReducer extends
+ Reducer<FloatWritable, SelectorEntry, FloatWritable, SelectorEntry> {
+
+ private HashMap<String, int[]> hostCounts = new HashMap<>();
+ private long count;
+ private int currentsegmentnum = 1;
+ private MultipleOutputs<FloatWritable, SelectorEntry> mos;
+ private String outputFile;
+ private long limit;
+ private int segCounts[];
+ private int maxNumSegments = 1;
+ private int maxCount;
+ private Configuration conf;
+ private boolean byDomain = false;
+ private URLNormalizers normalizers;
+ private static boolean normalise;
+ private SequenceFile.Reader[] hostdbReaders = null;
+ private Expression maxCountExpr = null;
+ private Expression fetchDelayExpr = null;
+
+ public void open() {
+ if (conf.get(GENERATOR_HOSTDB) != null) {
try {
- sort = scfilters.generatorSortValue(key, crawlDatum, sort);
- } catch (ScoringFilterException sfe) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + sfe);
- }
+ Path path = new Path(conf.get(GENERATOR_HOSTDB), "current");
+ hostdbReaders = SegmentReaderUtil.getReaders(path, conf);
+ } catch (IOException e) {
+ LOG.error("Error reading HostDB because {}", e.getMessage());
}
-
- // check expr
- if (expr != null) {
- if (!crawlDatum.evaluate(expr, key.toString())) {
- context.getCounter("Generator", "EXPR_REJECTED").increment(1);
- return;
- }
- }
-
- if (restrictStatus != -1 && restrictStatus != crawlDatum.getStatus()) {
- context.getCounter("Generator", "STATUS_REJECTED").increment(1);
- return;
- }
-
- // consider only entries with a score superior to the threshold
- if (!Float.isNaN(scoreThreshold) && sort < scoreThreshold) {
- context.getCounter("Generator", "SCORE_TOO_LOW").increment(1);
- return;
- }
-
- // consider only entries with a retry (or fetch) interval lower than
- // threshold
- if (intervalThreshold != -1
- && crawlDatum.getFetchInterval() > intervalThreshold) {
- context.getCounter("Generator", "INTERVAL_REJECTED").increment(1);
- return;
- }
-
- // sort by decreasing score, using DecreasingFloatComparator
- sortValue.set(sort);
- // record generation time
- crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
- entry.datum = crawlDatum;
- entry.url = key;
- context.write(sortValue, entry); // invert for sort by score
}
}
+ public void close() {
+ if (hostdbReaders != null) {
+ try {
+ for (int i = 0; i < hostdbReaders.length; i++) {
+ hostdbReaders[i].close();
+ }
+ } catch (IOException e) {
+ LOG.error("Error closing HostDB because {}", e.getMessage());
+ }
+ }
+ }
-
- /** Collect until limit is reached. */
- public static class SelectorReducer extends
- Reducer<FloatWritable, SelectorEntry, FloatWritable, SelectorEntry> {
-
- private HashMap<String, int[]> hostCounts = new HashMap<>();
- private long count;
- private int currentsegmentnum = 1;
- private MultipleOutputs<FloatWritable, SelectorEntry> mos;
- private String outputFile;
- private long limit;
- private int segCounts[];
- private int maxNumSegments = 1;
- private int maxCount;
- private Configuration conf;
- private boolean byDomain = false;
- private URLNormalizers normalizers;
- private static boolean normalise;
- private SequenceFile.Reader[] hostdbReaders = null;
- private Expression maxCountExpr = null;
- private Expression fetchDelayExpr = null;
+ private JexlContext createContext(HostDatum datum) {
+ JexlContext context = new MapContext();
+ context.set("dnsFailures", datum.getDnsFailures());
+ context.set("connectionFailures", datum.getConnectionFailures());
+ context.set("unfetched", datum.getUnfetched());
+ context.set("fetched", datum.getFetched());
+ context.set("notModified", datum.getNotModified());
+ context.set("redirTemp", datum.getRedirTemp());
+ context.set("redirPerm", datum.getRedirPerm());
+ context.set("gone", datum.getGone());
+ context.set("conf", conf);
- public void open() {
- if (conf.get(GENERATOR_HOSTDB) != null) {
+ // Set metadata variables
+ for (Map.Entry<Writable, Writable> entry : datum.getMetaData()
+ .entrySet()) {
+ Object value = entry.getValue();
+
+ if (value instanceof FloatWritable) {
+ FloatWritable fvalue = (FloatWritable) value;
+ Text tkey = (Text) entry.getKey();
+ context.set(tkey.toString(), fvalue.get());
+ }
+
+ if (value instanceof IntWritable) {
+ IntWritable ivalue = (IntWritable) value;
+ Text tkey = (Text) entry.getKey();
+ context.set(tkey.toString(), ivalue.get());
+ }
+
+ if (value instanceof Text) {
+ Text tvalue = (Text) value;
+ Text tkey = (Text) entry.getKey();
+ context.set(tkey.toString().replace("-", "_"), tvalue.toString());
+ }
+ }
+
+ return context;
+ }
+
+ @Override
+ public void setup(Context context) throws IOException {
+ conf = context.getConfiguration();
+ mos = new MultipleOutputs<FloatWritable, SelectorEntry>(context);
+ Job job = Job.getInstance(conf);
+ limit = conf.getLong(GENERATOR_TOP_N, Long.MAX_VALUE)
+ / job.getNumReduceTasks();
+ maxNumSegments = conf.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1);
+ segCounts = new int[maxNumSegments];
+ maxCount = conf.getInt(GENERATOR_MAX_COUNT, -1);
+ if (maxCount == -1) {
+ byDomain = false;
+ }
+ if (GENERATOR_COUNT_VALUE_DOMAIN.equals(conf.get(GENERATOR_COUNT_MODE)))
+ byDomain = true;
+ normalise = conf.getBoolean(GENERATOR_NORMALISE, true);
+ if (normalise)
+ normalizers = new URLNormalizers(conf,
+ URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+
+ if (conf.get(GENERATOR_HOSTDB) != null) {
+ maxCountExpr = JexlUtil
+ .parseExpression(conf.get(GENERATOR_MAX_COUNT_EXPR, null));
+ fetchDelayExpr = JexlUtil
+ .parseExpression(conf.get(GENERATOR_FETCH_DELAY_EXPR, null));
+ }
+ }
+
+ @Override
+ public void cleanup(Context context)
+ throws IOException, InterruptedException {
+ mos.close();
+ }
+
+ @Override
+ public void reduce(FloatWritable key, Iterable<SelectorEntry> values,
+ Context context) throws IOException, InterruptedException {
+
+ String hostname = null;
+ HostDatum host = null;
+ LongWritable variableFetchDelayWritable = null; // in millis
+ Text variableFetchDelayKey = new Text("_variableFetchDelay_");
+ // local variable maxCount may hold host-specific count set in HostDb
+ int maxCount = this.maxCount;
+ for (SelectorEntry entry : values) {
+ Text url = entry.url;
+ String urlString = url.toString();
+ URL u = null;
+
+ // Do this only once per queue
+ if (host == null) {
try {
- Path path = new Path(conf.get(GENERATOR_HOSTDB), "current");
- hostdbReaders = SegmentReaderUtil.getReaders(path, conf);
- } catch (IOException e) {
- LOG.error("Error reading HostDB because {}", e.getMessage());
- }
- }
- }
-
- public void close() {
- if (hostdbReaders != null) {
- try {
- for (int i = 0; i < hostdbReaders.length; i++) {
- hostdbReaders[i].close();
- }
- } catch (IOException e) {
- LOG.error("Error closing HostDB because {}", e.getMessage());
- }
- }
- }
-
- private JexlContext createContext(HostDatum datum) {
- JexlContext context = new MapContext();
- context.set("dnsFailures", datum.getDnsFailures());
- context.set("connectionFailures", datum.getConnectionFailures());
- context.set("unfetched", datum.getUnfetched());
- context.set("fetched", datum.getFetched());
- context.set("notModified", datum.getNotModified());
- context.set("redirTemp", datum.getRedirTemp());
- context.set("redirPerm", datum.getRedirPerm());
- context.set("gone", datum.getGone());
- context.set("conf", conf);
-
- // Set metadata variables
- for (Map.Entry<Writable, Writable> entry : datum.getMetaData().entrySet()) {
- Object value = entry.getValue();
-
- if (value instanceof FloatWritable) {
- FloatWritable fvalue = (FloatWritable)value;
- Text tkey = (Text)entry.getKey();
- context.set(tkey.toString(), fvalue.get());
- }
-
- if (value instanceof IntWritable) {
- IntWritable ivalue = (IntWritable)value;
- Text tkey = (Text)entry.getKey();
- context.set(tkey.toString(), ivalue.get());
+ hostname = URLUtil.getHost(urlString);
+ host = getHostDatum(hostname);
+ } catch (Exception e) {
}
- if (value instanceof Text) {
- Text tvalue = (Text)value;
- Text tkey = (Text)entry.getKey();
- context.set(tkey.toString().replace("-", "_"), tvalue.toString());
- }
- }
-
- return context;
- }
-
- @Override
- public void setup(Context context) throws IOException {
- conf = context.getConfiguration();
- mos = new MultipleOutputs<FloatWritable, SelectorEntry>(context);
- Job job = Job.getInstance(conf);
- limit = conf.getLong(GENERATOR_TOP_N, Long.MAX_VALUE)
- / job.getNumReduceTasks();
- maxNumSegments = conf.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1);
- segCounts = new int[maxNumSegments];
- maxCount = conf.getInt(GENERATOR_MAX_COUNT, -1);
- if (maxCount == -1) {
- byDomain = false;
- }
- if (GENERATOR_COUNT_VALUE_DOMAIN.equals(conf.get(GENERATOR_COUNT_MODE)))
- byDomain = true;
- normalise = conf.getBoolean(GENERATOR_NORMALISE, true);
- if (normalise)
- normalizers = new URLNormalizers(conf,
- URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
-
- if (conf.get(GENERATOR_HOSTDB) != null) {
- maxCountExpr = JexlUtil.parseExpression(conf.get(GENERATOR_MAX_COUNT_EXPR, null));
- fetchDelayExpr = JexlUtil.parseExpression(conf.get(GENERATOR_FETCH_DELAY_EXPR, null));
- }
- }
-
- @Override
- public void cleanup(Context context) throws IOException, InterruptedException {
- mos.close();
- }
-
- @Override
- public void reduce(FloatWritable key, Iterable<SelectorEntry> values,
- Context context)
- throws IOException, InterruptedException {
-
- String hostname = null;
- HostDatum host = null;
- LongWritable variableFetchDelayWritable = null; // in millis
- Text variableFetchDelayKey = new Text("_variableFetchDelay_");
- // local variable maxCount may hold host-specific count set in HostDb
- int maxCount = this.maxCount;
- for (SelectorEntry entry : values) {
- Text url = entry.url;
- String urlString = url.toString();
- URL u = null;
-
- // Do this only once per queue
+ // Got it?
if (host == null) {
- try {
- hostname = URLUtil.getHost(urlString);
- host = getHostDatum(hostname);
- } catch (Exception e) {}
-
- // Got it?
- if (host == null) {
- // Didn't work, prevent future lookups
- host = new HostDatum();
- } else {
- if (maxCountExpr != null) {
- long variableMaxCount = Math.round((double)maxCountExpr.evaluate(createContext(host)));
- LOG.info("Generator: variable maxCount: {} for {}", variableMaxCount, hostname);
- maxCount = (int)variableMaxCount;
- }
-
- if (fetchDelayExpr != null) {
- long variableFetchDelay = Math.round((double)fetchDelayExpr.evaluate(createContext(host)));
- LOG.info("Generator: variable fetchDelay: {} ms for {}", variableFetchDelay, hostname);
- variableFetchDelayWritable = new LongWritable(variableFetchDelay);
- }
- }
- }
-
- // Got a non-zero variable fetch delay? Add it to the datum's metadata
- if (variableFetchDelayWritable != null) {
- entry.datum.getMetaData().put(variableFetchDelayKey, variableFetchDelayWritable);
- }
-
- if (count == limit) {
- // do we have any segments left?
- if (currentsegmentnum < maxNumSegments) {
- count = 0;
- currentsegmentnum++;
- } else
- break;
- }
-
- String hostordomain = null;
-
- try {
- if (normalise && normalizers != null) {
- urlString = normalizers.normalize(urlString,
- URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
- }
- u = new URL(urlString);
- if (byDomain) {
- hostordomain = URLUtil.getDomainName(u);
- } else {
- hostordomain = u.getHost();
- }
- } catch (MalformedURLException e) {
- LOG.warn("Malformed URL: '{}', skipping ({})", urlString,
- StringUtils.stringifyException(e));
- context.getCounter("Generator", "MALFORMED_URL").increment(1);
- continue;
- }
-
- hostordomain = hostordomain.toLowerCase();
-
- // only filter if we are counting hosts or domains
- if (maxCount > 0) {
- int[] hostCount = hostCounts.get(hostordomain);
- if (hostCount == null) {
- hostCount = new int[] { 1, 0 };
- hostCounts.put(hostordomain, hostCount);
+ // Didn't work, prevent future lookups
+ host = new HostDatum();
+ } else {
+ if (maxCountExpr != null) {
+ long variableMaxCount = Math
+ .round((double) maxCountExpr.evaluate(createContext(host)));
+ LOG.info("Generator: variable maxCount: {} for {}",
+ variableMaxCount, hostname);
+ maxCount = (int) variableMaxCount;
}
- // increment hostCount
- hostCount[1]++;
+ if (fetchDelayExpr != null) {
+ long variableFetchDelay = Math
+ .round((double) fetchDelayExpr.evaluate(createContext(host)));
+ LOG.info("Generator: variable fetchDelay: {} ms for {}",
+ variableFetchDelay, hostname);
+ variableFetchDelayWritable = new LongWritable(variableFetchDelay);
+ }
+ }
+ }
- // check if topN reached, select next segment if it is
- while (segCounts[hostCount[0] - 1] >= limit
- && hostCount[0] < maxNumSegments) {
+ // Got a non-zero variable fetch delay? Add it to the datum's metadata
+ if (variableFetchDelayWritable != null) {
+ entry.datum.getMetaData().put(variableFetchDelayKey,
+ variableFetchDelayWritable);
+ }
+
+ if (count == limit) {
+ // do we have any segments left?
+ if (currentsegmentnum < maxNumSegments) {
+ count = 0;
+ currentsegmentnum++;
+ } else
+ break;
+ }
+
+ String hostordomain = null;
+
+ try {
+ if (normalise && normalizers != null) {
+ urlString = normalizers.normalize(urlString,
+ URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+ }
+ u = new URL(urlString);
+ if (byDomain) {
+ hostordomain = URLUtil.getDomainName(u);
+ } else {
+ hostordomain = u.getHost();
+ }
+ } catch (MalformedURLException e) {
+ LOG.warn("Malformed URL: '{}', skipping ({})", urlString,
+ StringUtils.stringifyException(e));
+ context.getCounter("Generator", "MALFORMED_URL").increment(1);
+ continue;
+ }
+
+ hostordomain = hostordomain.toLowerCase();
+
+ // only filter if we are counting hosts or domains
+ if (maxCount > 0) {
+ int[] hostCount = hostCounts.get(hostordomain);
+ if (hostCount == null) {
+ hostCount = new int[] { 1, 0 };
+ hostCounts.put(hostordomain, hostCount);
+ }
+
+ // increment hostCount
+ hostCount[1]++;
+
+ // check if topN reached, select next segment if it is
+ while (segCounts[hostCount[0] - 1] >= limit
+ && hostCount[0] < maxNumSegments) {
+ hostCount[0]++;
+ hostCount[1] = 0;
+ }
+
+ // reached the limit of allowed URLs per host / domain
+ // see if we can put it in the next segment?
+ if (hostCount[1] > maxCount) {
+ if (hostCount[0] < maxNumSegments) {
hostCount[0]++;
- hostCount[1] = 0;
- }
-
- // reached the limit of allowed URLs per host / domain
- // see if we can put it in the next segment?
- if (hostCount[1] > maxCount) {
- if (hostCount[0] < maxNumSegments) {
- hostCount[0]++;
- hostCount[1] = 1;
- } else {
- if (hostCount[1] == maxCount) {
- context
+ hostCount[1] = 1;
+ } else {
+ if (hostCount[1] == maxCount) {
+ context
.getCounter("Generator", "HOSTS_AFFECTED_PER_HOST_OVERFLOW")
.increment(1);
- LOG.info(
- "Host or domain {} has more than {} URLs for all {} segments. Additional URLs won't be included in the fetchlist.",
- hostordomain, maxCount, maxNumSegments);
- }
- // skip this entry
- context.getCounter("Generator", "URLS_SKIPPED_PER_HOST_OVERFLOW")
- .increment(1);
- continue;
+ LOG.info(
+ "Host or domain {} has more than {} URLs for all {} segments. Additional URLs won't be included in the fetchlist.",
+ hostordomain, maxCount, maxNumSegments);
}
- }
- entry.segnum = new IntWritable(hostCount[0]);
- segCounts[hostCount[0] - 1]++;
- } else {
- entry.segnum = new IntWritable(currentsegmentnum);
- segCounts[currentsegmentnum - 1]++;
- }
-
- outputFile = generateFileName(entry);
- mos.write("sequenceFiles", key, entry, outputFile);
-
- // Count is incremented only when we keep the URL
- // maxCount may cause us to skip it.
- count++;
- }
- }
-
- private String generateFileName(SelectorEntry entry) {
- return "fetchlist-" + entry.segnum.toString() + "/part";
- }
-
- private HostDatum getHostDatum(String host) throws Exception {
- Text key = new Text();
- HostDatum value = new HostDatum();
-
- open();
- for (int i = 0; i < hostdbReaders.length; i++) {
- while (hostdbReaders[i].next(key, value)) {
- if (host.equals(key.toString())) {
- close();
- return value;
+ // skip this entry
+ context.getCounter("Generator", "URLS_SKIPPED_PER_HOST_OVERFLOW")
+ .increment(1);
+ continue;
}
}
+ entry.segnum = new IntWritable(hostCount[0]);
+ segCounts[hostCount[0] - 1]++;
+ } else {
+ entry.segnum = new IntWritable(currentsegmentnum);
+ segCounts[currentsegmentnum - 1]++;
}
- close();
- return null;
+ outputFile = generateFileName(entry);
+ mos.write("sequenceFiles", key, entry, outputFile);
+
+ // Count is incremented only when we keep the URL
+ // maxCount may cause us to skip it.
+ count++;
}
}
- public static class DecreasingFloatComparator extends
- FloatWritable.Comparator {
+ private String generateFileName(SelectorEntry entry) {
+ return "fetchlist-" + entry.segnum.toString() + "/part";
+ }
+
+ private HostDatum getHostDatum(String host) throws Exception {
+ Text key = new Text();
+ HostDatum value = new HostDatum();
+
+ open();
+ for (int i = 0; i < hostdbReaders.length; i++) {
+ while (hostdbReaders[i].next(key, value)) {
+ if (host.equals(key.toString())) {
+ close();
+ return value;
+ }
+ }
+ }
+
+ close();
+ return null;
+ }
+ }
+
+ public static class DecreasingFloatComparator
+ extends FloatWritable.Comparator {
/** Compares two FloatWritables decreasing. */
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
@@ -564,23 +575,21 @@
}
}
- public static class SelectorInverseMapper extends
- Mapper<FloatWritable, SelectorEntry, Text, SelectorEntry> {
+ public static class SelectorInverseMapper
+ extends Mapper<FloatWritable, SelectorEntry, Text, SelectorEntry> {
- public void map(FloatWritable key, SelectorEntry value,
- Context context)
+ public void map(FloatWritable key, SelectorEntry value, Context context)
throws IOException, InterruptedException {
SelectorEntry entry = value;
context.write(entry.url, entry);
}
}
- public static class PartitionReducer extends
- Reducer<Text, SelectorEntry, Text, CrawlDatum> {
+ public static class PartitionReducer
+ extends Reducer<Text, SelectorEntry, Text, CrawlDatum> {
public void reduce(Text key, Iterable<SelectorEntry> values,
- Context context)
- throws IOException, InterruptedException {
+ Context context) throws IOException, InterruptedException {
// if using HashComparator, we get only one input key in case of
// hash collision so use only URLs from values
for (SelectorEntry entry : values) {
@@ -626,38 +635,37 @@
*/
public static class CrawlDbUpdater {
- public static class CrawlDbUpdateMapper extends
- Mapper<Text, CrawlDatum, Text, CrawlDatum> {
+ public static class CrawlDbUpdateMapper
+ extends Mapper<Text, CrawlDatum, Text, CrawlDatum> {
@Override
- public void map(Text key, CrawlDatum value,
- Context context)
- throws IOException, InterruptedException {
+ public void map(Text key, CrawlDatum value, Context context)
+ throws IOException, InterruptedException {
context.write(key, value);
}
}
- public static class CrawlDbUpdateReducer extends
- Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+ public static class CrawlDbUpdateReducer
+ extends Reducer<Text, CrawlDatum, Text, CrawlDatum> {
private CrawlDatum orig = new CrawlDatum();
private LongWritable genTime = new LongWritable(0L);
private long generateTime;
@Override
- public void setup(Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
+ public void setup(
+ Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
Configuration conf = context.getConfiguration();
generateTime = conf.getLong(Nutch.GENERATE_TIME_KEY, 0L);
}
@Override
- public void reduce(Text key, Iterable<CrawlDatum> values,
- Context context)
- throws IOException, InterruptedException {
+ public void reduce(Text key, Iterable<CrawlDatum> values, Context context)
+ throws IOException, InterruptedException {
genTime.set(0L);
for (CrawlDatum val : values) {
if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {
- LongWritable gt = (LongWritable) val.getMetaData().get(
- Nutch.WRITABLE_GENERATE_TIME_KEY);
+ LongWritable gt = (LongWritable) val.getMetaData()
+ .get(Nutch.WRITABLE_GENERATE_TIME_KEY);
genTime.set(gt.get());
if (genTime.get() != generateTime) {
orig.set(val);
@@ -684,14 +692,15 @@
}
public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
- long curTime) throws IOException, InterruptedException, ClassNotFoundException {
+ long curTime)
+ throws IOException, InterruptedException, ClassNotFoundException {
Job job = NutchJob.getInstance(getConf());
Configuration conf = job.getConfiguration();
boolean filter = conf.getBoolean(GENERATOR_FILTER, true);
boolean normalise = conf.getBoolean(GENERATOR_NORMALISE, true);
- return generate(dbDir, segments, numLists, topN, curTime, filter,
- normalise, false, 1, null);
+ return generate(dbDir, segments, numLists, topN, curTime, filter, normalise,
+ false, 1, null);
}
/**
@@ -699,14 +708,16 @@
* normalise and set the number of segments to 1
**/
public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
- long curTime, boolean filter, boolean force) throws IOException, InterruptedException, ClassNotFoundException {
+ long curTime, boolean filter, boolean force)
+ throws IOException, InterruptedException, ClassNotFoundException {
return generate(dbDir, segments, numLists, topN, curTime, filter, true,
force, 1, null);
}
-
+
public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
long curTime, boolean filter, boolean norm, boolean force,
- int maxNumSegments, String expr) throws IOException, InterruptedException, ClassNotFoundException {
+ int maxNumSegments, String expr)
+ throws IOException, InterruptedException, ClassNotFoundException {
return generate(dbDir, segments, numLists, topN, curTime, filter, true,
force, 1, expr, null);
}
@@ -735,14 +746,15 @@
*/
public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
long curTime, boolean filter, boolean norm, boolean force,
- int maxNumSegments, String expr, String hostdb) throws IOException, InterruptedException, ClassNotFoundException {
+ int maxNumSegments, String expr, String hostdb)
+ throws IOException, InterruptedException, ClassNotFoundException {
Path tempDir = new Path(getConf().get("mapreduce.cluster.temp.dir", ".")
+ "/generate-temp-" + java.util.UUID.randomUUID().toString());
FileSystem fs = tempDir.getFileSystem(getConf());
Path lock = CrawlDb.lock(getConf(), dbDir, force);
-
+
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("Generator: starting at " + sdf.format(start));
@@ -758,17 +770,21 @@
if (hostdb != null) {
LOG.info("Generator: hostdb: {}", hostdb);
}
-
+
// map to inverted subset due for fetch, sort by score
Job job = NutchJob.getInstance(getConf());
job.setJobName("generate: select from " + dbDir);
Configuration conf = job.getConfiguration();
if (numLists == -1) { // for politeness make
- numLists = Integer.parseInt(conf.get("mapreduce.job.maps")); // a partition per fetch task
+ numLists = Integer.parseInt(conf.get("mapreduce.job.maps")); // a
+ // partition
+ // per fetch
+ // task
}
if ("local".equals(conf.get("mapreduce.framework.name")) && numLists != 1) {
// override
- LOG.info("Generator: running in local mode, generating exactly one partition.");
+ LOG.info(
+ "Generator: running in local mode, generating exactly one partition.");
numLists = 1;
}
conf.setLong(GENERATOR_CUR_TIME, curTime);
@@ -797,7 +813,9 @@
job.setOutputKeyClass(FloatWritable.class);
job.setSortComparatorClass(DecreasingFloatComparator.class);
job.setOutputValueClass(SelectorEntry.class);
- MultipleOutputs.addNamedOutput(job, "sequenceFiles", SequenceFileOutputFormat.class, FloatWritable.class, SelectorEntry.class);
+ MultipleOutputs.addNamedOutput(job, "sequenceFiles",
+ SequenceFileOutputFormat.class, FloatWritable.class,
+ SelectorEntry.class);
try {
boolean success = job.waitForCompletion(true);
@@ -838,7 +856,7 @@
}
} catch (Exception e) {
LOG.warn("Generator: exception while partitioning segments, exiting ...");
- LockUtil.removeLockFile(getConf(),lock);
+ LockUtil.removeLockFile(getConf(), lock);
fs.delete(tempDir, true);
return null;
}
@@ -946,7 +964,7 @@
throw new RuntimeException(message);
}
} catch (IOException | InterruptedException | ClassNotFoundException e) {
- LOG.error(StringUtils.stringifyException(e));
+ LOG.error(StringUtils.stringifyException(e));
throw e;
}
@@ -968,15 +986,15 @@
* Generate a fetchlist from the crawldb.
*/
public static void main(String args[]) throws Exception {
- int res = ToolRunner
- .run(NutchConfiguration.create(), new Generator(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new Generator(),
+ args);
System.exit(res);
}
public int run(String[] args) throws Exception {
if (args.length < 2) {
- System.out
- .println("Usage: Generator <crawldb> <segments_dir> [-force] [-topN N] [-numFetchers numFetchers] [-expr <expr>] [-adddays <numDays>] [-noFilter] [-noNorm] [-maxNumSegments <num>]");
+ System.out.println(
+ "Usage: Generator <crawldb> <segments_dir> [-force] [-topN N] [-numFetchers numFetchers] [-expr <expr>] [-adddays <numDays>] [-noFilter] [-noNorm] [-maxNumSegments <num>]");
return -1;
}
@@ -1032,7 +1050,8 @@
}
@Override
- public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception {
+ public Map<String, Object> run(Map<String, Object> args, String crawlId)
+ throws Exception {
Map<String, Object> results = new HashMap<>();
@@ -1046,67 +1065,63 @@
String expr = null;
String hostdb = null;
Path crawlDb;
-
- if(args.containsKey(Nutch.ARG_CRAWLDB)) {
+
+ if (args.containsKey(Nutch.ARG_CRAWLDB)) {
Object crawldbPath = args.get(Nutch.ARG_CRAWLDB);
- if(crawldbPath instanceof Path) {
+ if (crawldbPath instanceof Path) {
crawlDb = (Path) crawldbPath;
- }
- else {
+ } else {
crawlDb = new Path(crawldbPath.toString());
}
- }
- else {
- crawlDb = new Path(crawlId+"/crawldb");
+ } else {
+ crawlDb = new Path(crawlId + "/crawldb");
}
Path segmentsDir;
- if(args.containsKey(Nutch.ARG_SEGMENTDIR)) {
+ if (args.containsKey(Nutch.ARG_SEGMENTDIR)) {
Object segDir = args.get(Nutch.ARG_SEGMENTDIR);
- if(segDir instanceof Path) {
+ if (segDir instanceof Path) {
segmentsDir = (Path) segDir;
- }
- else {
+ } else {
segmentsDir = new Path(segDir.toString());
}
- }
- else {
- segmentsDir = new Path(crawlId+"/segments");
+ } else {
+ segmentsDir = new Path(crawlId + "/segments");
}
if (args.containsKey(Nutch.ARG_HOSTDB)) {
- hostdb = (String)args.get(Nutch.ARG_HOSTDB);
+ hostdb = (String) args.get(Nutch.ARG_HOSTDB);
}
-
+
if (args.containsKey("expr")) {
- expr = (String)args.get("expr");
+ expr = (String) args.get("expr");
}
if (args.containsKey("topN")) {
- topN = Long.parseLong((String)args.get("topN"));
+ topN = Long.parseLong((String) args.get("topN"));
}
if (args.containsKey("numFetchers")) {
- numFetchers = Integer.parseInt((String)args.get("numFetchers"));
+ numFetchers = Integer.parseInt((String) args.get("numFetchers"));
}
if (args.containsKey("adddays")) {
- long numDays = Integer.parseInt((String)args.get("adddays"));
+ long numDays = Integer.parseInt((String) args.get("adddays"));
curTime += numDays * 1000L * 60 * 60 * 24;
}
if (args.containsKey("noFilter")) {
filter = false;
- }
+ }
if (args.containsKey("noNorm")) {
norm = false;
- }
+ }
if (args.containsKey("force")) {
force = true;
- }
+ }
if (args.containsKey("maxNumSegments")) {
- maxNumSegments = Integer.parseInt((String)args.get("maxNumSegments"));
+ maxNumSegments = Integer.parseInt((String) args.get("maxNumSegments"));
}
try {
Path[] segs = generate(crawlDb, segmentsDir, numFetchers, topN, curTime,
filter, norm, force, maxNumSegments, expr, hostdb);
- if (segs == null){
+ if (segs == null) {
results.put(Nutch.VAL_RESULT, Integer.toString(1));
return results;
}