blob: 00eb18f66097bade5c88c6cc2ee4d63bfbfad8a9 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nutch.crawl;
import java.lang.invoke.MethodHandles;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configurable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.jexl3.JexlExpression;
import org.apache.commons.jexl3.JexlContext;
import org.apache.commons.jexl3.MapContext;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.nutch.hostdb.HostDatum;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.scoring.ScoringFilterException;
import org.apache.nutch.scoring.ScoringFilters;
import org.apache.nutch.util.JexlUtil;
import org.apache.nutch.util.LockUtil;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.SegmentReaderUtil;
import org.apache.nutch.util.TimingUtil;
import org.apache.nutch.util.URLUtil;
* Generates a subset of a crawl db to fetch. This version allows to generate
* fetchlists for several segments in one go. Unlike in the initial version
* (OldGenerator), the IP resolution is done ONLY on the entries which have been
* selected for fetching. The URLs are partitioned by IP, domain or host within
* a segment. We can chose separately how to count the URLS i.e. by domain or
* host to limit the entries.
public class Generator extends NutchTool implements Tool {
private static final Random RANDOM = new Random();
protected static final Logger LOG = LoggerFactory
public static final String GENERATE_UPDATE_CRAWLDB = "generate.update.crawldb";
public static final String GENERATOR_MIN_SCORE = "generate.min.score";
public static final String GENERATOR_MIN_INTERVAL = "generate.min.interval";
public static final String GENERATOR_RESTRICT_STATUS = "generate.restrict.status";
public static final String GENERATOR_FILTER = "generate.filter";
public static final String GENERATOR_NORMALISE = "generate.normalise";
public static final String GENERATOR_MAX_COUNT = "generate.max.count";
public static final String GENERATOR_COUNT_MODE = "generate.count.mode";
public static final String GENERATOR_COUNT_VALUE_DOMAIN = "domain";
public static final String GENERATOR_COUNT_VALUE_HOST = "host";
public static final String GENERATOR_TOP_N = "generate.topN";
public static final String GENERATOR_CUR_TIME = "generate.curTime";
public static final String GENERATOR_DELAY = "crawl.gen.delay";
public static final String GENERATOR_MAX_NUM_SEGMENTS = "generate.max.num.segments";
public static final String GENERATOR_EXPR = "generate.expr";
public static final String GENERATOR_HOSTDB = "generate.hostdb";
public static final String GENERATOR_MAX_COUNT_EXPR = "generate.max.count.expr";
public static final String GENERATOR_FETCH_DELAY_EXPR = "generate.fetch.delay.expr";
public static class SelectorEntry implements Writable {
public Text url;
public CrawlDatum datum;
public IntWritable segnum;
public SelectorEntry() {
url = new Text();
datum = new CrawlDatum();
segnum = new IntWritable(0);
public void readFields(DataInput in) throws IOException {
public void write(DataOutput out) throws IOException {
public String toString() {
return "url=" + url.toString() + ", datum=" + datum.toString()
+ ", segnum=" + segnum.toString();
/** Selects entries due for fetch. */
public static class Selector extends Partitioner<FloatWritable, Writable>
implements Configurable {
private final URLPartitioner partitioner = new URLPartitioner();
/** Partition by host / domain or IP. */
public int getPartition(FloatWritable key, Writable value,
int numReduceTasks) {
return partitioner.getPartition(((SelectorEntry) value).url, key,
public Configuration getConf() {
return partitioner.getConf();
public void setConf(Configuration conf) {
/** Select and invert subset due for fetch. */
public static class SelectorMapper
extends Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry> {
private LongWritable genTime = new LongWritable(System.currentTimeMillis());
private long curTime;
private Configuration conf;
private URLFilters filters;
private ScoringFilters scfilters;
private SelectorEntry entry = new SelectorEntry();
private FloatWritable sortValue = new FloatWritable();
private boolean filter;
private long genDelay;
private FetchSchedule schedule;
private float scoreThreshold = 0f;
private int intervalThreshold = -1;
private byte restrictStatus = -1;
private JexlExpression expr = null;
public void setup(
Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry>.Context context)
throws IOException {
conf = context.getConfiguration();
curTime = conf.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis());
filters = new URLFilters(conf);
scfilters = new ScoringFilters(conf);
filter = conf.getBoolean(GENERATOR_FILTER, true);
/* CrawlDb items are unblocked after 7 days as default */
genDelay = conf.getLong(GENERATOR_DELAY, 604800000L);
long time = conf.getLong(Nutch.GENERATE_TIME_KEY, 0L);
if (time > 0)
schedule = FetchScheduleFactory.getFetchSchedule(conf);
scoreThreshold = conf.getFloat(GENERATOR_MIN_SCORE, Float.NaN);
intervalThreshold = conf.getInt(GENERATOR_MIN_INTERVAL, -1);
String restrictStatusString = conf.getTrimmed(GENERATOR_RESTRICT_STATUS,
if (!restrictStatusString.isEmpty()) {
restrictStatus = CrawlDatum.getStatusByName(restrictStatusString);
expr = JexlUtil.parseExpression(conf.get(GENERATOR_EXPR, null));
public void map(Text key, CrawlDatum value, Context context)
throws IOException, InterruptedException {
Text url = key;
if (filter) {
// If filtering is on don't generate URLs that don't pass
// URLFilters
try {
if (filters.filter(url.toString()) == null)
} catch (URLFilterException e) {
LOG.warn("Couldn't filter url: {} ({})", url, e.getMessage());
CrawlDatum crawlDatum = value;
// check fetch schedule
if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
LOG.debug("-shouldFetch rejected '{}', fetchTime={}, curTime={}", url,
crawlDatum.getFetchTime(), curTime);
context.getCounter("Generator", "SCHEDULE_REJECTED").increment(1);
LongWritable oldGenTime = (LongWritable) crawlDatum.getMetaData()
if (oldGenTime != null) { // awaiting fetch & update
if (oldGenTime.get() + genDelay > curTime) // still wait for
// update
context.getCounter("Generator", "WAIT_FOR_UPDATE").increment(1);
float sort = 1.0f;
try {
sort = scfilters.generatorSortValue(key, crawlDatum, sort);
} catch (ScoringFilterException sfe) {
if (LOG.isWarnEnabled()) {
"Couldn't filter generatorSortValue for " + key + ": " + sfe);
// check expr
if (expr != null) {
if (!crawlDatum.evaluate(expr, key.toString())) {
context.getCounter("Generator", "EXPR_REJECTED").increment(1);
if (restrictStatus != -1 && restrictStatus != crawlDatum.getStatus()) {
context.getCounter("Generator", "STATUS_REJECTED").increment(1);
// consider only entries with a score superior to the threshold
if (!Float.isNaN(scoreThreshold) && sort < scoreThreshold) {
context.getCounter("Generator", "SCORE_TOO_LOW").increment(1);
// consider only entries with a retry (or fetch) interval lower than
// threshold
if (intervalThreshold != -1
&& crawlDatum.getFetchInterval() > intervalThreshold) {
context.getCounter("Generator", "INTERVAL_REJECTED").increment(1);
// sort by decreasing score, using DecreasingFloatComparator
// record generation time
crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
entry.datum = crawlDatum;
entry.url = key;
context.write(sortValue, entry); // invert for sort by score
/** Collect until limit is reached. */
public static class SelectorReducer extends
Reducer<FloatWritable, SelectorEntry, FloatWritable, SelectorEntry> {
private HashMap<String, int[]> hostCounts = new HashMap<>();
private long count;
private int currentsegmentnum = 1;
private MultipleOutputs<FloatWritable, SelectorEntry> mos;
private String outputFile;
private long limit;
private int segCounts[];
private int maxNumSegments = 1;
private int maxCount;
private Configuration conf;
private boolean byDomain = false;
private URLNormalizers normalizers;
private static boolean normalise;
private SequenceFile.Reader[] hostdbReaders = null;
private JexlExpression maxCountExpr = null;
private JexlExpression fetchDelayExpr = null;
public void open() {
if (conf.get(GENERATOR_HOSTDB) != null) {
try {
Path path = new Path(conf.get(GENERATOR_HOSTDB), "current");
hostdbReaders = SegmentReaderUtil.getReaders(path, conf);
} catch (IOException e) {
LOG.error("Error reading HostDB because {}", e.getMessage());
public void close() {
if (hostdbReaders != null) {
try {
for (int i = 0; i < hostdbReaders.length; i++) {
} catch (IOException e) {
LOG.error("Error closing HostDB because {}", e.getMessage());
private JexlContext createContext(HostDatum datum) {
JexlContext context = new MapContext();
context.set("dnsFailures", datum.getDnsFailures());
context.set("connectionFailures", datum.getConnectionFailures());
context.set("unfetched", datum.getUnfetched());
context.set("fetched", datum.getFetched());
context.set("notModified", datum.getNotModified());
context.set("redirTemp", datum.getRedirTemp());
context.set("redirPerm", datum.getRedirPerm());
context.set("gone", datum.getGone());
context.set("conf", conf);
// Set metadata variables
for (Map.Entry<Writable, Writable> entry : datum.getMetaData()
.entrySet()) {
Object value = entry.getValue();
if (value instanceof FloatWritable) {
FloatWritable fvalue = (FloatWritable) value;
Text tkey = (Text) entry.getKey();
context.set(tkey.toString(), fvalue.get());
if (value instanceof IntWritable) {
IntWritable ivalue = (IntWritable) value;
Text tkey = (Text) entry.getKey();
context.set(tkey.toString(), ivalue.get());
if (value instanceof Text) {
Text tvalue = (Text) value;
Text tkey = (Text) entry.getKey();
context.set(tkey.toString().replace("-", "_"), tvalue.toString());
return context;
public void setup(Context context) throws IOException {
conf = context.getConfiguration();
mos = new MultipleOutputs<FloatWritable, SelectorEntry>(context);
Job job = Job.getInstance(conf);
limit = conf.getLong(GENERATOR_TOP_N, Long.MAX_VALUE)
/ job.getNumReduceTasks();
maxNumSegments = conf.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1);
segCounts = new int[maxNumSegments];
maxCount = conf.getInt(GENERATOR_MAX_COUNT, -1);
if (maxCount == -1) {
byDomain = false;
byDomain = true;
normalise = conf.getBoolean(GENERATOR_NORMALISE, true);
if (normalise)
normalizers = new URLNormalizers(conf,
if (conf.get(GENERATOR_HOSTDB) != null) {
maxCountExpr = JexlUtil
.parseExpression(conf.get(GENERATOR_MAX_COUNT_EXPR, null));
fetchDelayExpr = JexlUtil
.parseExpression(conf.get(GENERATOR_FETCH_DELAY_EXPR, null));
public void cleanup(Context context)
throws IOException, InterruptedException {
public void reduce(FloatWritable key, Iterable<SelectorEntry> values,
Context context) throws IOException, InterruptedException {
String hostname = null;
HostDatum host = null;
LongWritable variableFetchDelayWritable = null; // in millis
Text variableFetchDelayKey = new Text("_variableFetchDelay_");
// local variable maxCount may hold host-specific count set in HostDb
int maxCount = this.maxCount;
for (SelectorEntry entry : values) {
Text url = entry.url;
String urlString = url.toString();
URL u = null;
// Do this only once per queue
if (host == null) {
try {
hostname = URLUtil.getHost(urlString);
host = getHostDatum(hostname);
} catch (Exception e) {
// Got it?
if (host == null) {
// Didn't work, prevent future lookups
host = new HostDatum();
} else {
if (maxCountExpr != null) {
long variableMaxCount = Math
.round((double) maxCountExpr.evaluate(createContext(host)));"Generator: variable maxCount: {} for {}",
variableMaxCount, hostname);
maxCount = (int) variableMaxCount;
if (fetchDelayExpr != null) {
long variableFetchDelay = Math
.round((double) fetchDelayExpr.evaluate(createContext(host)));"Generator: variable fetchDelay: {} ms for {}",
variableFetchDelay, hostname);
variableFetchDelayWritable = new LongWritable(variableFetchDelay);
// Got a non-zero variable fetch delay? Add it to the datum's metadata
if (variableFetchDelayWritable != null) {
if (count == limit) {
// do we have any segments left?
if (currentsegmentnum < maxNumSegments) {
count = 0;
} else
String hostordomain = null;
try {
if (normalise && normalizers != null) {
urlString = normalizers.normalize(urlString,
u = new URL(urlString);
if (byDomain) {
hostordomain = URLUtil.getDomainName(u);
} else {
hostordomain = u.getHost();
} catch (MalformedURLException e) {
LOG.warn("Malformed URL: '{}', skipping ({})", urlString,
context.getCounter("Generator", "MALFORMED_URL").increment(1);
hostordomain = hostordomain.toLowerCase();
// only filter if we are counting hosts or domains
if (maxCount > 0) {
int[] hostCount = hostCounts.get(hostordomain);
if (hostCount == null) {
hostCount = new int[] { 1, 0 };
hostCounts.put(hostordomain, hostCount);
// increment hostCount
// check if topN reached, select next segment if it is
while (segCounts[hostCount[0] - 1] >= limit
&& hostCount[0] < maxNumSegments) {
hostCount[1] = 0;
// reached the limit of allowed URLs per host / domain
// see if we can put it in the next segment?
if (hostCount[1] > maxCount) {
if (hostCount[0] < maxNumSegments) {
hostCount[1] = 1;
} else {
if (hostCount[1] == (maxCount+1)) {
.getCounter("Generator", "HOSTS_AFFECTED_PER_HOST_OVERFLOW")
"Host or domain {} has more than {} URLs for all {} segments. Additional URLs won't be included in the fetchlist.",
hostordomain, maxCount, maxNumSegments);
// skip this entry
context.getCounter("Generator", "URLS_SKIPPED_PER_HOST_OVERFLOW")
entry.segnum = new IntWritable(hostCount[0]);
segCounts[hostCount[0] - 1]++;
} else {
entry.segnum = new IntWritable(currentsegmentnum);
segCounts[currentsegmentnum - 1]++;
outputFile = generateFileName(entry);
mos.write("sequenceFiles", key, entry, outputFile);
// Count is incremented only when we keep the URL
// maxCount may cause us to skip it.
private String generateFileName(SelectorEntry entry) {
return "fetchlist-" + entry.segnum.toString() + "/part";
private HostDatum getHostDatum(String host) throws Exception {
Text key = new Text();
HostDatum value = new HostDatum();
for (int i = 0; i < hostdbReaders.length; i++) {
while (hostdbReaders[i].next(key, value)) {
if (host.equals(key.toString())) {
return value;
return null;
public static class DecreasingFloatComparator
extends FloatWritable.Comparator {
/** Compares two FloatWritables decreasing. */
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return, s2, l2, b1, s1, l1);
public static class SelectorInverseMapper
extends Mapper<FloatWritable, SelectorEntry, Text, SelectorEntry> {
public void map(FloatWritable key, SelectorEntry value, Context context)
throws IOException, InterruptedException {
SelectorEntry entry = value;
context.write(entry.url, entry);
public static class PartitionReducer
extends Reducer<Text, SelectorEntry, Text, CrawlDatum> {
public void reduce(Text key, Iterable<SelectorEntry> values,
Context context) throws IOException, InterruptedException {
// if using HashComparator, we get only one input key in case of
// hash collision so use only URLs from values
for (SelectorEntry entry : values) {
context.write(entry.url, entry.datum);
/** Sort fetch lists by hash of URL. */
public static class HashComparator extends WritableComparator {
public HashComparator() {
public int compare(WritableComparable a, WritableComparable b) {
Text url1 = (Text) a;
Text url2 = (Text) b;
int hash1 = hash(url1.getBytes(), 0, url1.getLength());
int hash2 = hash(url2.getBytes(), 0, url2.getLength());
return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1));
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int hash1 = hash(b1, s1, l1);
int hash2 = hash(b2, s2, l2);
return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1));
private static int hash(byte[] bytes, int start, int length) {
int hash = 1;
// make later bytes more significant in hash code, so that sorting
// by hashcode correlates less with by-host ordering.
for (int i = length - 1; i >= 0; i--)
hash = (31 * hash) + (int) bytes[start + i];
return hash;
* Update the CrawlDB so that the next generate won't include the same URLs.
public static class CrawlDbUpdater {
public static class CrawlDbUpdateMapper
extends Mapper<Text, CrawlDatum, Text, CrawlDatum> {
public void map(Text key, CrawlDatum value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
public static class CrawlDbUpdateReducer
extends Reducer<Text, CrawlDatum, Text, CrawlDatum> {
private CrawlDatum orig = new CrawlDatum();
private LongWritable genTime = new LongWritable(0L);
private long generateTime;
public void setup(
Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
Configuration conf = context.getConfiguration();
generateTime = conf.getLong(Nutch.GENERATE_TIME_KEY, 0L);
public void reduce(Text key, Iterable<CrawlDatum> values, Context context)
throws IOException, InterruptedException {
for (CrawlDatum val : values) {
if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {
LongWritable gt = (LongWritable) val.getMetaData()
if (genTime.get() != generateTime) {
} else {
if (genTime.get() != 0L) {
orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
context.write(key, orig);
public Generator() {
public Generator(Configuration conf) {
public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
long curTime)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = NutchJob.getInstance(getConf());
Configuration conf = job.getConfiguration();
boolean filter = conf.getBoolean(GENERATOR_FILTER, true);
boolean normalise = conf.getBoolean(GENERATOR_NORMALISE, true);
return generate(dbDir, segments, numLists, topN, curTime, filter, normalise,
false, 1, null);
* This is an old signature used for compatibility - does not specify whether or not to
* normalise and set the number of segments to 1
* @param dbDir
* Crawl database directory
* @param segments
* Segments directory
* @param numLists
* Number of reduce tasks
* @param topN
* Number of top URLs to be selected
* @param curTime
* Current time in milliseconds
* @param filter whether to apply filtering operation
* @param force if true, and the target lockfile exists, consider it valid. If false
* and the target file exists, throw an IOException.
* @deprecated since 1.19 use
* {@link #generate(Path, Path, int, long, long, boolean, boolean, boolean, int, String, String)}
* or {@link #generate(Path, Path, int, long, long, boolean, boolean, boolean, int, String)}
* in the instance that no hostdb is available
* @throws IOException if an I/O exception occurs.
* @see LockUtil#createLockFile(Configuration, Path, boolean)
* @throws InterruptedException if a thread is waiting, sleeping, or
* otherwise occupied, and the thread is interrupted, either before or
* during the activity.
* @throws ClassNotFoundException if runtime class(es) are not available
* @return Path to generated segment or null if no entries were selected
public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
long curTime, boolean filter, boolean force)
throws IOException, InterruptedException, ClassNotFoundException {
return generate(dbDir, segments, numLists, topN, curTime, filter, true,
force, 1, null);
* This signature should be used in the instance that no hostdb is available.
* Generate fetchlists in one or more segments. Whether to filter URLs or not
* is read from the &quot;generate.filter&quot; property set for the job from
* command-line. If the property is not found, the URLs are filtered. Same for
* the normalisation.
* @param dbDir
* Crawl database directory
* @param segments
* Segments directory
* @param numLists
* Number of reduce tasks
* @param topN
* Number of top URLs to be selected
* @param curTime
* Current time in milliseconds
* @param filter whether to apply filtering operation
* @param norm whether to apply normilization operation
* @param force if true, and the target lockfile exists, consider it valid. If false
* and the target file exists, throw an IOException.
* @param maxNumSegments maximum number of segments to generate
* @param expr a Jexl expression to use in the Generator job.
* @see JexlUtil#parseExpression(String)
* @throws IOException if an I/O exception occurs.
* @see LockUtil#createLockFile(Configuration, Path, boolean)
* @throws InterruptedException if a thread is waiting, sleeping, or
* otherwise occupied, and the thread is interrupted, either before or
* during the activity.
* @throws ClassNotFoundException if runtime class(es) are not available
* @return Path to generated segment or null if no entries were selected
public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
long curTime, boolean filter, boolean norm, boolean force,
int maxNumSegments, String expr)
throws IOException, InterruptedException, ClassNotFoundException {
return generate(dbDir, segments, numLists, topN, curTime, filter, true,
force, 1, expr, null);
* Generate fetchlists in one or more segments. Whether to filter URLs or not
* is read from the &quot;generate.filter&quot; property set for the job from
* command-line. If the property is not found, the URLs are filtered. Same for
* the normalisation.
* @param dbDir
* Crawl database directory
* @param segments
* Segments directory
* @param numLists
* Number of reduce tasks
* @param topN
* Number of top URLs to be selected
* @param curTime
* Current time in milliseconds
* @param filter whether to apply filtering operation
* @param norm whether to apply normilization operation
* @param force if true, and the target lockfile exists, consider it valid. If false
* and the target file exists, throw an IOException.
* @param maxNumSegments maximum number of segments to generate
* @param expr a Jexl expression to use in the Generator job.
* @param hostdb name of a hostdb from which to execute Jexl expressions in a bid
* to determine the maximum URL count and/or fetch delay per host.
* @see JexlUtil#parseExpression(String)
* @throws IOException if an I/O exception occurs.
* @see LockUtil#createLockFile(Configuration, Path, boolean)
* @throws InterruptedException if a thread is waiting, sleeping, or
* otherwise occupied, and the thread is interrupted, either before or
* during the activity.
* @throws ClassNotFoundException if runtime class(es) are not available
* @return Path to generated segment or null if no entries were selected
public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
long curTime, boolean filter, boolean norm, boolean force,
int maxNumSegments, String expr, String hostdb)
throws IOException, InterruptedException, ClassNotFoundException {
Path tempDir = new Path(getConf().get("mapreduce.cluster.temp.dir", ".")
+ "/generate-temp-" + java.util.UUID.randomUUID().toString());
FileSystem fs = tempDir.getFileSystem(getConf());
Path lock = CrawlDb.lock(getConf(), dbDir, force);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();"Generator: starting at " + sdf.format(start));"Generator: Selecting best-scoring urls due for fetch.");"Generator: filtering: " + filter);"Generator: normalizing: " + norm);
if (topN != Long.MAX_VALUE) {"Generator: topN: {}", topN);
if (expr != null) {"Generator: expr: {}", expr);
if (hostdb != null) {"Generator: hostdb: {}", hostdb);
// map to inverted subset due for fetch, sort by score
Job job = NutchJob.getInstance(getConf());
job.setJobName("generate: select from " + dbDir);
Configuration conf = job.getConfiguration();
if (numLists == -1) {
/* for politeness create exactly one partition per fetch task */
numLists = Integer.parseInt(conf.get("mapreduce.job.maps"));
if ("local".equals(conf.get("")) && numLists != 1) {
// override
"Generator: running in local mode, generating exactly one partition.");
numLists = 1;
conf.setLong(GENERATOR_CUR_TIME, curTime);
// record real generation time
long generateTime = System.currentTimeMillis();
conf.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
conf.setLong(GENERATOR_TOP_N, topN);
conf.setBoolean(GENERATOR_FILTER, filter);
conf.setBoolean(GENERATOR_NORMALISE, norm);
conf.setInt(GENERATOR_MAX_NUM_SEGMENTS, maxNumSegments);
if (expr != null) {
conf.set(GENERATOR_EXPR, expr);
if (hostdb != null) {
conf.set(GENERATOR_HOSTDB, hostdb);
FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
FileOutputFormat.setOutputPath(job, tempDir);
MultipleOutputs.addNamedOutput(job, "sequenceFiles",
SequenceFileOutputFormat.class, FloatWritable.class,
try {
boolean success = job.waitForCompletion(true);
if (!success) {
String message = "Generator job did not succeed, job status:"
+ job.getStatus().getState() + ", reason: "
+ job.getStatus().getFailureInfo();
NutchJob.cleanupAfterFailure(tempDir, lock, fs);
throw new RuntimeException(message);
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("Generator job failed: {}", e.getMessage());
NutchJob.cleanupAfterFailure(tempDir, lock, fs);
throw e;
}"Generator: number of items rejected during selection:");
for (Counter counter : job.getCounters().getGroup("Generator")) {"Generator: {} {}",
String.format(Locale.ROOT, "%6d", counter.getValue()),
if (!getConf().getBoolean(GENERATE_UPDATE_CRAWLDB, false)) {
* generated items are not marked in CrawlDb, and CrawlDb will not
* accessed anymore: we already can release the lock
LockUtil.removeLockFile(getConf(), lock);
lock = null;
// read the subdirectories generated in the temp
// output and turn them into segments
List<Path> generatedSegments = new ArrayList<>();
FileStatus[] status = fs.listStatus(tempDir);
try {
for (FileStatus stat : status) {
Path subfetchlist = stat.getPath();
if (!subfetchlist.getName().startsWith("fetchlist-"))
// start a new partition job for this segment
Path newSeg = partitionSegment(segments, subfetchlist, numLists);
} catch (Exception e) {
LOG.warn("Generator: exception while partitioning segments, exiting ...");
NutchJob.cleanupAfterFailure(tempDir, lock, fs);
return null;
if (generatedSegments.size() == 0) {
LOG.warn("Generator: 0 records selected for fetching, exiting ...");
NutchJob.cleanupAfterFailure(tempDir, lock, fs);
return null;
if (getConf().getBoolean(GENERATE_UPDATE_CRAWLDB, false)) {
// update the db from tempDir
Path tempDir2 = new Path(dbDir,
"generate-temp-" + java.util.UUID.randomUUID().toString());
job = NutchJob.getInstance(getConf());
job.setJobName("generate: updatedb " + dbDir);
job.getConfiguration().setLong(Nutch.GENERATE_TIME_KEY, generateTime);
for (Path segmpaths : generatedSegments) {
Path subGenDir = new Path(segmpaths, CrawlDatum.GENERATE_DIR_NAME);
FileInputFormat.addInputPath(job, subGenDir);
FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
FileOutputFormat.setOutputPath(job, tempDir2);
try {
boolean success = job.waitForCompletion(true);
if (!success) {
String message = "Generator job did not succeed, job status:"
+ job.getStatus().getState() + ", reason: "
+ job.getStatus().getFailureInfo();
NutchJob.cleanupAfterFailure(tempDir, lock, fs);
NutchJob.cleanupAfterFailure(tempDir2, lock, fs);
throw new RuntimeException(message);
CrawlDb.install(job, dbDir);
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("Generator job failed: {}", e.getMessage());
NutchJob.cleanupAfterFailure(tempDir, lock, fs);
NutchJob.cleanupAfterFailure(tempDir2, lock, fs);
throw e;
fs.delete(tempDir2, true);
if (lock != null) {
LockUtil.removeLockFile(getConf(), lock);
fs.delete(tempDir, true);
long end = System.currentTimeMillis();"Generator: finished at " + sdf.format(end) + ", elapsed: "
+ TimingUtil.elapsedTime(start, end));
Path[] patharray = new Path[generatedSegments.size()];
return generatedSegments.toArray(patharray);
private Path partitionSegment(Path segmentsDir, Path inputDir, int numLists)
throws IOException, ClassNotFoundException, InterruptedException {
// invert again, partition by host/domain/IP, sort by url hash"Generator: Partitioning selected urls for politeness.");
Path segment = new Path(segmentsDir, generateSegmentName());
Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME);"Generator: segment: " + segment);
Job job = NutchJob.getInstance(getConf());
job.setJobName("generate: partition " + segment);
Configuration conf = job.getConfiguration();
conf.setInt("partition.url.seed", RANDOM.nextInt());
FileInputFormat.addInputPath(job, inputDir);
FileOutputFormat.setOutputPath(job, output);
try {
boolean success = job.waitForCompletion(true);
if (!success) {
String message = "Generator job did not succeed, job status:"
+ job.getStatus().getState() + ", reason: "
+ job.getStatus().getFailureInfo();
throw new RuntimeException(message);
} catch (IOException | InterruptedException | ClassNotFoundException e) {
throw e;
return segment;
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
public static synchronized String generateSegmentName() {
try {
} catch (Throwable t) {
return sdf.format(new Date(System.currentTimeMillis()));
* Generate a fetchlist from the crawldb.
* @param args array of arguments for this job
* @throws Exception if there is an error running the job
public static void main(String args[]) throws Exception {
int res =, new Generator(),
public int run(String[] args) throws Exception {
if (args.length < 2) {
"Usage: Generator <crawldb> <segments_dir> [-force] [-topN N] [-numFetchers numFetchers] [-expr <expr>] [-adddays <numDays>] [-noFilter] [-noNorm] [-maxNumSegments <num>]");
return -1;
Path dbDir = new Path(args[0]);
Path segmentsDir = new Path(args[1]);
String hostdb = null;
long curTime = System.currentTimeMillis();
long topN = Long.MAX_VALUE;
int numFetchers = -1;
boolean filter = true;
boolean norm = true;
boolean force = false;
String expr = null;
int maxNumSegments = 1;
for (int i = 2; i < args.length; i++) {
if ("-topN".equals(args[i])) {
topN = Long.parseLong(args[i + 1]);
} else if ("-numFetchers".equals(args[i])) {
numFetchers = Integer.parseInt(args[i + 1]);
} else if ("-hostdb".equals(args[i])) {
hostdb = args[i + 1];
} else if ("-adddays".equals(args[i])) {
long numDays = Integer.parseInt(args[i + 1]);
curTime += numDays * 1000L * 60 * 60 * 24;
} else if ("-noFilter".equals(args[i])) {
filter = false;
} else if ("-noNorm".equals(args[i])) {
norm = false;
} else if ("-force".equals(args[i])) {
force = true;
} else if ("-maxNumSegments".equals(args[i])) {
maxNumSegments = Integer.parseInt(args[i + 1]);
} else if ("-expr".equals(args[i])) {
expr = args[i + 1];
try {
Path[] segs = generate(dbDir, segmentsDir, numFetchers, topN, curTime,
filter, norm, force, maxNumSegments, expr, hostdb);
if (segs == null)
return 1;
} catch (Exception e) {
LOG.error("Generator: " + StringUtils.stringifyException(e));
return -1;
return 0;
public Map<String, Object> run(Map<String, Object> args, String crawlId)
throws Exception {
Map<String, Object> results = new HashMap<>();
long curTime = System.currentTimeMillis();
long topN = Long.MAX_VALUE;
int numFetchers = -1;
boolean filter = true;
boolean norm = true;
boolean force = false;
int maxNumSegments = 1;
String expr = null;
String hostdb = null;
Path crawlDb;
if (args.containsKey(Nutch.ARG_CRAWLDB)) {
Object crawldbPath = args.get(Nutch.ARG_CRAWLDB);
if (crawldbPath instanceof Path) {
crawlDb = (Path) crawldbPath;
} else {
crawlDb = new Path(crawldbPath.toString());
} else {
crawlDb = new Path(crawlId + "/crawldb");
Path segmentsDir;
if (args.containsKey(Nutch.ARG_SEGMENTDIR)) {
Object segDir = args.get(Nutch.ARG_SEGMENTDIR);
if (segDir instanceof Path) {
segmentsDir = (Path) segDir;
} else {
segmentsDir = new Path(segDir.toString());
} else {
segmentsDir = new Path(crawlId + "/segments");
if (args.containsKey(Nutch.ARG_HOSTDB)) {
hostdb = (String) args.get(Nutch.ARG_HOSTDB);
if (args.containsKey("expr")) {
expr = (String) args.get("expr");
if (args.containsKey("topN")) {
topN = Long.parseLong((String) args.get("topN"));
if (args.containsKey("numFetchers")) {
numFetchers = Integer.parseInt((String) args.get("numFetchers"));
if (args.containsKey("adddays")) {
long numDays = Integer.parseInt((String) args.get("adddays"));
curTime += numDays * 1000L * 60 * 60 * 24;
if (args.containsKey("noFilter")) {
filter = false;
if (args.containsKey("noNorm")) {
norm = false;
if (args.containsKey("force")) {
force = true;
if (args.containsKey("maxNumSegments")) {
maxNumSegments = Integer.parseInt((String) args.get("maxNumSegments"));
try {
Path[] segs = generate(crawlDb, segmentsDir, numFetchers, topN, curTime,
filter, norm, force, maxNumSegments, expr, hostdb);
if (segs == null) {
results.put(Nutch.VAL_RESULT, Integer.toString(1));
return results;
} catch (Exception e) {
LOG.error("Generator: " + StringUtils.stringifyException(e));
results.put(Nutch.VAL_RESULT, Integer.toString(-1));
return results;
results.put(Nutch.VAL_RESULT, Integer.toString(0));
return results;