blob: f558c469d5b93bb2074fdab393f61f6c88532e36 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.util;
import java.io.IOException;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.hostdb.HostDatum;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.protocol.Content;
import org.apache.nutch.protocol.Protocol;
import org.apache.nutch.protocol.ProtocolFactory;
import org.apache.nutch.protocol.ProtocolOutput;
import org.apache.nutch.protocol.ProtocolStatus;
import org.apache.nutch.util.NutchJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import crawlercommons.robots.BaseRobotRules;
import crawlercommons.sitemaps.AbstractSiteMap;
import crawlercommons.sitemaps.SiteMap;
import crawlercommons.sitemaps.SiteMapIndex;
import crawlercommons.sitemaps.SiteMapParser;
import crawlercommons.sitemaps.SiteMapURL;
/**
* <p>Performs Sitemap processing by fetching sitemap links, parsing the content and merging
* the urls from Sitemap (with the metadata) with the existing crawldb.</p>
*
* <p>There are two use cases supported in Nutch's Sitemap processing:</p>
* <ol>
* <li>Sitemaps are considered as "remote seed lists". Crawl administrators can prepare a
* list of sitemap links and get only those sitemap pages. This suits well for targeted
* crawl of specific hosts.</li>
* <li>For open web crawl, it is not possible to track each host and get the sitemap links
* manually. Nutch would automatically get the sitemaps for all the hosts seen in the
* crawls and inject the urls from sitemap to the crawldb.</li>
* </ol>
*
* <p>For more details see:
* https://wiki.apache.org/nutch/SitemapFeature </p>
*/
public class SitemapProcessor extends Configured implements Tool {
public static final Logger LOG = LoggerFactory.getLogger(SitemapProcessor.class);
public static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static final String CURRENT_NAME = "current";
public static final String LOCK_NAME = ".locked";
public static final String SITEMAP_STRICT_PARSING = "sitemap.strict.parsing";
public static final String SITEMAP_URL_FILTERING = "sitemap.url.filter";
public static final String SITEMAP_URL_NORMALIZING = "sitemap.url.normalize";
public static final String SITEMAP_ALWAYS_TRY_SITEMAPXML_ON_ROOT = "sitemap.url.default.sitemap.xml";
public static final String SITEMAP_OVERWRITE_EXISTING = "sitemap.url.overwrite.existing";
public static final String SITEMAP_REDIR_MAX = "sitemap.redir.max";
public static final String SITEMAP_SIZE_MAX = "sitemap.size.max";
private static class SitemapMapper extends Mapper<Text, Writable, Text, CrawlDatum> {
private ProtocolFactory protocolFactory = null;
private boolean strict = true;
private boolean filter = true;
private boolean normalize = true;
private boolean tryDefaultSitemapXml = true;
private int maxRedir = 3;
private URLFilters filters = null;
private URLNormalizers normalizers = null;
private CrawlDatum datum = new CrawlDatum();
private SiteMapParser parser = null;
public void setup(Context context) {
Configuration conf = context.getConfiguration();
int maxSize = conf.getInt(SITEMAP_SIZE_MAX, SiteMapParser.MAX_BYTES_ALLOWED);
conf.setInt("http.content.limit", maxSize);
conf.setInt("file.content.limit", maxSize);
this.protocolFactory = new ProtocolFactory(conf);
this.filter = conf.getBoolean(SITEMAP_URL_FILTERING, true);
this.normalize = conf.getBoolean(SITEMAP_URL_NORMALIZING, true);
this.strict = conf.getBoolean(SITEMAP_STRICT_PARSING, true);
this.tryDefaultSitemapXml = conf.getBoolean(SITEMAP_ALWAYS_TRY_SITEMAPXML_ON_ROOT, true);
this.maxRedir = conf.getInt(SITEMAP_REDIR_MAX, 3);
this.parser = new SiteMapParser(strict);
if (filter) {
filters = new URLFilters(conf);
}
if (normalize) {
normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
}
}
public void map(Text key, Writable value, Context context) throws IOException, InterruptedException {
String url;
try {
if (value instanceof CrawlDatum) {
// If its an entry from CrawlDb, emit it. It will be merged in the reducer
context.write(key, (CrawlDatum) value);
}
else if (value instanceof HostDatum) {
generateSitemapsFromHostname(key.toString(), context);
}
else if (value instanceof Text) {
// Input can be sitemap URL or hostname
url = key.toString();
if (url.startsWith("http://") ||
url.startsWith("https://") ||
url.startsWith("ftp://") ||
url.startsWith("file:/")) {
// For entry from sitemap urls file, fetch the sitemap, extract urls and emit those
if((url = filterNormalize(url)) == null) {
context.getCounter("Sitemap", "filtered_records").increment(1);
return;
}
context.getCounter("Sitemap", "sitemap_seeds").increment(1);
generateSitemapUrlDatum(protocolFactory.getProtocol(url), url, context);
} else {
LOG.info("generateSitemapsFromHostname: " + key.toString());
generateSitemapsFromHostname(key.toString(), context);
}
}
} catch (Exception e) {
LOG.warn("Exception for record {} : {}", key.toString(), StringUtils.stringifyException(e));
}
}
/* Filters and or normalizes the input URL */
private String filterNormalize(String url) {
try {
if (normalizers != null)
url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT);
if (filters != null)
url = filters.filter(url);
} catch (Exception e) {
return null;
}
return url;
}
private void generateSitemapsFromHostname(String host, Context context) {
try {
// For entry from hostdb, get sitemap url(s) from robots.txt, fetch the sitemap,
// extract urls and emit those
// try different combinations of schemes one by one till we get rejection in all cases
String url;
if((url = filterNormalize("http://" + host + "/")) == null &&
(url = filterNormalize("https://" + host + "/")) == null &&
(url = filterNormalize("ftp://" + host + "/")) == null &&
(url = filterNormalize("file:/" + host + "/")) == null) {
context.getCounter("Sitemap", "filtered_records").increment(1);
return;
}
// We may wish to use the robots.txt content as the third parameter for .getRobotRules
BaseRobotRules rules = protocolFactory.getProtocol(url).getRobotRules(new Text(url), datum, null);
List<String> sitemaps = rules.getSitemaps();
if (tryDefaultSitemapXml && sitemaps.size() == 0) {
sitemaps.add(url + "sitemap.xml");
}
for (String sitemap : sitemaps) {
context.getCounter("Sitemap", "sitemaps_from_hostname").increment(1);
sitemap = filterNormalize(sitemap);
if (sitemap == null) {
context.getCounter("Sitemap", "filtered_sitemaps_from_hostname")
.increment(1);
} else {
generateSitemapUrlDatum(protocolFactory.getProtocol(sitemap),
sitemap, context);
}
}
} catch (Exception e) {
LOG.warn("Exception for record {} : {}", host, StringUtils.stringifyException(e));
}
}
private void generateSitemapUrlDatum(Protocol protocol, String url, Context context) throws Exception {
ProtocolOutput output = protocol.getProtocolOutput(new Text(url), datum);
ProtocolStatus status = output.getStatus();
Content content = output.getContent();
// Following redirects http > https and what else
int maxRedir = this.maxRedir;
while (!output.getStatus().isSuccess() && output.getStatus().isRedirect() && maxRedir > 0) {
String[] stuff = output.getStatus().getArgs();
url = filterNormalize(stuff[0]);
// get out!
if (url == null) {
break;
}
output = protocol.getProtocolOutput(new Text(url), datum);
status = output.getStatus();
content = output.getContent();
maxRedir--;
}
if(status.getCode() != ProtocolStatus.SUCCESS) {
// If there were any problems fetching the sitemap, log the error and let it go. Not sure how often
// sitemaps are redirected. In future we might have to handle redirects.
context.getCounter("Sitemap", "failed_fetches").increment(1);
LOG.error("Error while fetching the sitemap. Status code: {} for {}", status.getCode(), url);
return;
}
AbstractSiteMap asm = parser.parseSiteMap(content.getContentType(), content.getContent(), new URL(url));
if(asm instanceof SiteMap) {
LOG.info("Parsing sitemap file: {}", asm.getUrl().toString());
SiteMap sm = (SiteMap) asm;
Collection<SiteMapURL> sitemapUrls = sm.getSiteMapUrls();
for(SiteMapURL sitemapUrl: sitemapUrls) {
// If 'strict' is ON, only allow valid urls. Else allow all urls
if(!strict || sitemapUrl.isValid()) {
String key = filterNormalize(sitemapUrl.getUrl().toString());
if (key != null) {
CrawlDatum sitemapUrlDatum = new CrawlDatum();
sitemapUrlDatum.setStatus(CrawlDatum.STATUS_INJECTED);
sitemapUrlDatum.setScore((float) sitemapUrl.getPriority());
if(sitemapUrl.getChangeFrequency() != null) {
int fetchInterval = -1;
switch(sitemapUrl.getChangeFrequency()) {
case ALWAYS: fetchInterval = 1; break;
case HOURLY: fetchInterval = 3600; break; // 60*60
case DAILY: fetchInterval = 86400; break; // 60*60*24
case WEEKLY: fetchInterval = 604800; break; // 60*60*24*7
case MONTHLY: fetchInterval = 2592000; break; // 60*60*24*30
case YEARLY: fetchInterval = 31536000; break; // 60*60*24*365
case NEVER: fetchInterval = Integer.MAX_VALUE; break; // Loose "NEVER" contract
}
sitemapUrlDatum.setFetchInterval(fetchInterval);
}
if(sitemapUrl.getLastModified() != null) {
sitemapUrlDatum.setModifiedTime(sitemapUrl.getLastModified().getTime());
}
context.write(new Text(key), sitemapUrlDatum);
}
}
}
}
else if (asm instanceof SiteMapIndex) {
SiteMapIndex index = (SiteMapIndex) asm;
Collection<AbstractSiteMap> sitemapUrls = index.getSitemaps();
if (sitemapUrls.isEmpty()) {
return;
}
LOG.info("Parsing sitemap index file: {}", index.getUrl().toString());
for (AbstractSiteMap sitemap : sitemapUrls) {
String sitemapUrl = filterNormalize(sitemap.getUrl().toString());
if (sitemapUrl != null) {
generateSitemapUrlDatum(protocol, sitemapUrl, context);
}
}
}
}
}
private static class SitemapReducer extends Reducer<Text, CrawlDatum, Text, CrawlDatum> {
CrawlDatum sitemapDatum = null;
CrawlDatum originalDatum = null;
private boolean overwriteExisting = false; // DO NOT ENABLE!!
public void setup(Context context) {
Configuration conf = context.getConfiguration();
this.overwriteExisting = conf.getBoolean(SITEMAP_OVERWRITE_EXISTING, false);
}
public void reduce(Text key, Iterable<CrawlDatum> values, Context context)
throws IOException, InterruptedException {
sitemapDatum = null;
originalDatum = null;
for (CrawlDatum curr: values) {
if(curr.getStatus() == CrawlDatum.STATUS_INJECTED) {
sitemapDatum = new CrawlDatum();
sitemapDatum.set(curr);
}
else {
originalDatum = new CrawlDatum();
originalDatum.set(curr);
}
}
if(originalDatum != null) {
// The url was already present in crawldb. If we got the same url from sitemap too, save
// the information from sitemap to the original datum. Emit the original crawl datum
if(sitemapDatum != null && overwriteExisting) {
originalDatum.setScore(sitemapDatum.getScore());
originalDatum.setFetchInterval(sitemapDatum.getFetchInterval());
originalDatum.setModifiedTime(sitemapDatum.getModifiedTime());
}
context.getCounter("Sitemap", "existing_sitemap_entries").increment(1);
context.write(key, originalDatum);
}
else if(sitemapDatum != null) {
// For the newly discovered links via sitemap, set the status as unfetched and emit
context.getCounter("Sitemap", "new_sitemap_entries").increment(1);
sitemapDatum.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
context.write(key, sitemapDatum);
}
}
}
public void sitemap(Path crawldb, Path hostdb, Path sitemapUrlDir, boolean strict, boolean filter,
boolean normalize, int threads) throws Exception {
long start = System.currentTimeMillis();
if (LOG.isInfoEnabled()) {
LOG.info("SitemapProcessor: Starting at {}", sdf.format(start));
}
FileSystem fs = crawldb.getFileSystem(getConf());
Path old = new Path(crawldb, "old");
Path current = new Path(crawldb, "current");
Path tempCrawlDb = new Path(crawldb, "crawldb-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
// lock an existing crawldb to prevent multiple simultaneous updates
Path lock = new Path(crawldb, LOCK_NAME);
if (!fs.exists(current))
fs.mkdirs(current);
LockUtil.createLockFile(fs, lock, false);
Configuration conf = getConf();
conf.setBoolean(SITEMAP_STRICT_PARSING, strict);
conf.setBoolean(SITEMAP_URL_FILTERING, filter);
conf.setBoolean(SITEMAP_URL_NORMALIZING, normalize);
conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
Job job = Job.getInstance(conf, "SitemapProcessor_" + crawldb.toString());
job.setJarByClass(SitemapProcessor.class);
// add crawlDb, sitemap url directory and hostDb to input paths
MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class);
if (sitemapUrlDir != null)
MultipleInputs.addInputPath(job, sitemapUrlDir, KeyValueTextInputFormat.class);
if (hostdb != null)
MultipleInputs.addInputPath(job, new Path(hostdb, CURRENT_NAME), SequenceFileInputFormat.class);
FileOutputFormat.setOutputPath(job, tempCrawlDb);
job.setOutputFormatClass(MapFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
job.setMapperClass(MultithreadedMapper.class);
MultithreadedMapper.setMapperClass(job, SitemapMapper.class);
MultithreadedMapper.setNumberOfThreads(job, threads);
job.setReducerClass(SitemapReducer.class);
try {
boolean success = job.waitForCompletion(true);
if (!success) {
String message = "SitemapProcessor_" + crawldb.toString()
+ " job did not succeed, job status: " + job.getStatus().getState()
+ ", reason: " + job.getStatus().getFailureInfo();
LOG.error(message);
NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs);
// throw exception so that calling routine can exit with error
throw new RuntimeException(message);
}
boolean preserveBackup = conf.getBoolean("db.preserve.backup", true);
if (!preserveBackup && fs.exists(old))
fs.delete(old, true);
else
FSUtils.replace(fs, old, current, true);
FSUtils.replace(fs, current, tempCrawlDb, true);
LockUtil.removeLockFile(fs, lock);
if (LOG.isInfoEnabled()) {
long filteredRecords = job.getCounters().findCounter("Sitemap", "filtered_records").getValue();
long fromHostname = job.getCounters().findCounter("Sitemap", "sitemaps_from_hostname").getValue();
long fromSeeds = job.getCounters().findCounter("Sitemap", "sitemap_seeds").getValue();
long failedFetches = job.getCounters().findCounter("Sitemap", "failed_fetches").getValue();
long newSitemapEntries = job.getCounters().findCounter("Sitemap", "new_sitemap_entries").getValue();
LOG.info("SitemapProcessor: Total records rejected by filters: {}", filteredRecords);
LOG.info("SitemapProcessor: Total sitemaps from host name: {}", fromHostname);
LOG.info("SitemapProcessor: Total sitemaps from seed urls: {}", fromSeeds);
LOG.info("SitemapProcessor: Total failed sitemap fetches: {}", failedFetches);
LOG.info("SitemapProcessor: Total new sitemap entries added: {}", newSitemapEntries);
long end = System.currentTimeMillis();
LOG.info("SitemapProcessor: Finished at {}, elapsed: {}", sdf.format(end), TimingUtil.elapsedTime(start, end));
}
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("SitemapProcessor_" + crawldb.toString(), e);
NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs);
throw e;
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(NutchConfiguration.create(), new SitemapProcessor(), args);
System.exit(res);
}
public static void usage() {
System.err.println("Usage:\n SitemapProcessor <crawldb> [-hostdb <hostdb>] [-sitemapUrls <url_dir>] " +
"[-threads <threads>] [-force] [-noStrict] [-noFilter] [-noNormalize]\n");
System.err.println("\t<crawldb>\t\tpath to crawldb where the sitemap urls would be injected");
System.err.println("\t-hostdb <hostdb>\tpath of a hostdb. Sitemap(s) from these hosts would be downloaded");
System.err.println("\t-sitemapUrls <url_dir>\tpath to directory with sitemap urls or hostnames");
System.err.println("\t-threads <threads>\tNumber of threads created per mapper to fetch sitemap urls (default: 8)");
System.err.println("\t-force\t\t\tforce update even if CrawlDb appears to be locked (CAUTION advised)");
System.err.println("\t-noStrict\t\tBy default Sitemap parser rejects invalid urls. '-noStrict' disables that.");
System.err.println("\t-noFilter\t\tturn off URLFilters on urls (optional)");
System.err.println("\t-noNormalize\t\tturn off URLNormalizer on urls (optional)");
}
public int run(String[] args) throws Exception {
if (args.length < 3) {
usage();
return -1;
}
Path crawlDb = new Path(args[0]);
Path hostDb = null;
Path urlDir = null;
boolean strict = true;
boolean filter = true;
boolean normalize = true;
int threads = 8;
for (int i = 1; i < args.length; i++) {
if (args[i].equals("-hostdb")) {
hostDb = new Path(args[++i]);
LOG.info("SitemapProcessor: hostdb: {}", hostDb);
}
else if (args[i].equals("-sitemapUrls")) {
urlDir = new Path(args[++i]);
LOG.info("SitemapProcessor: sitemap urls dir: {}", urlDir);
}
else if (args[i].equals("-threads")) {
threads = Integer.valueOf(args[++i]);
LOG.info("SitemapProcessor: threads: {}", threads);
}
else if (args[i].equals("-noStrict")) {
LOG.info("SitemapProcessor: 'strict' parsing disabled");
strict = false;
}
else if (args[i].equals("-noFilter")) {
LOG.info("SitemapProcessor: filtering disabled");
filter = false;
}
else if (args[i].equals("-noNormalize")) {
LOG.info("SitemapProcessor: normalizing disabled");
normalize = false;
}
else {
LOG.info("SitemapProcessor: Found invalid argument \"{}\"\n", args[i]);
usage();
return -1;
}
}
try {
sitemap(crawlDb, hostDb, urlDir, strict, filter, normalize, threads);
return 0;
} catch (Exception e) {
LOG.error("SitemapProcessor: {}", StringUtils.stringifyException(e));
return -1;
}
}
}