| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nutch.scoring.webgraph; |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.CommandLineParser; |
| import org.apache.commons.cli.GnuParser; |
| import org.apache.commons.cli.HelpFormatter; |
| import org.apache.commons.cli.Option; |
| import org.apache.commons.cli.OptionBuilder; |
| import org.apache.commons.cli.Options; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.Configured; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.BooleanWritable; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.io.WritableUtils; |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; |
| import org.apache.hadoop.mapreduce.Mapper; |
| import org.apache.hadoop.mapreduce.Reducer; |
| import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.hadoop.util.ToolRunner; |
| import org.apache.nutch.crawl.NutchWritable; |
| import org.apache.nutch.crawl.CrawlDatum; |
| import org.apache.nutch.metadata.Nutch; |
| import org.apache.nutch.net.URLFilters; |
| import org.apache.nutch.net.URLNormalizers; |
| import org.apache.nutch.parse.Outlink; |
| import org.apache.nutch.parse.ParseData; |
| import org.apache.nutch.util.FSUtils; |
| import org.apache.nutch.util.HadoopFSUtil; |
| import org.apache.nutch.util.LockUtil; |
| import org.apache.nutch.util.NutchConfiguration; |
| import org.apache.nutch.util.NutchJob; |
| import org.apache.nutch.util.TimingUtil; |
| import org.apache.nutch.util.URLUtil; |
| |
| /** |
| * Creates three databases, one for inlinks, one for outlinks, and a node |
| * database that holds the number of in and outlinks to a url and the current |
| * score for the url. |
| * |
| * The score is set by an analysis program such as LinkRank. The WebGraph is an |
| * update-able database. Outlinks are stored by their fetch time or by the |
| * current system time if no fetch time is available. Only the most recent |
| * version of outlinks for a given url is stored. As more crawls are executed |
| * and the WebGraph updated, newer Outlinks will replace older Outlinks. This |
| * allows the WebGraph to adapt to changes in the link structure of the web. |
| * |
| * The Inlink database is created from the Outlink database and is regenerated |
| * when the WebGraph is updated. The Node database is created from both the |
| * Inlink and Outlink databases. Because the Node database is overwritten when |
| * the WebGraph is updated and because the Node database holds current scores |
| * for urls it is recommended that a crawl-cycle (one or more full crawls) fully |
| * complete before the WebGraph is updated and some type of analysis, such as |
| * LinkRank, is run to update scores in the Node database in a stable fashion. |
| */ |
| public class WebGraph extends Configured implements Tool { |
| |
| private static final Logger LOG = LoggerFactory |
| .getLogger(MethodHandles.lookup().lookupClass()); |
| public static final String LOCK_NAME = ".locked"; |
| public static final String INLINK_DIR = "inlinks"; |
| public static final String OUTLINK_DIR = "outlinks/current"; |
| public static final String OLD_OUTLINK_DIR = "outlinks/old"; |
| public static final String NODE_DIR = "nodes"; |
| |
| /** |
| * The OutlinkDb creates a database of all outlinks. Outlinks to internal urls |
| * by domain and host can be ignored. The number of Outlinks out to a given |
| * page or domain can also be limited. |
| */ |
| public static class OutlinkDb extends Configured { |
| |
| public static final String URL_NORMALIZING = "webgraph.url.normalizers"; |
| public static final String URL_FILTERING = "webgraph.url.filters"; |
| |
| /** |
| * Returns the fetch time from the parse data or the current system time if |
| * the fetch time doesn't exist. |
| * |
| * @param data |
| * The parse data. |
| * |
| * @return The fetch time as a long. |
| */ |
| private static long getFetchTime(ParseData data) { |
| |
| // default to current system time |
| long fetchTime = System.currentTimeMillis(); |
| String fetchTimeStr = data.getContentMeta().get(Nutch.FETCH_TIME_KEY); |
| try { |
| // get the fetch time from the parse data |
| fetchTime = Long.parseLong(fetchTimeStr); |
| } catch (Exception e) { |
| fetchTime = System.currentTimeMillis(); |
| } |
| return fetchTime; |
| } |
| |
| /** |
| * Default constructor. |
| */ |
| public OutlinkDb() { |
| } |
| |
| /** |
| * Configurable constructor. |
| */ |
| public OutlinkDb(Configuration conf) { |
| setConf(conf); |
| } |
| |
| /** |
| * Passes through existing LinkDatum objects from an existing OutlinkDb and |
| * maps out new LinkDatum objects from new crawls ParseData. |
| */ |
| public static class OutlinkDbMapper extends |
| Mapper<Text, Writable, Text, NutchWritable> { |
| |
| // using normalizers and/or filters |
| private boolean normalize = false; |
| private boolean filter = false; |
| |
| // url normalizers, filters and job configuration |
| private URLNormalizers urlNormalizers; |
| private URLFilters filters; |
| private Configuration conf; |
| |
| /** |
| * Normalizes and trims extra whitespace from the given url. |
| * |
| * @param url |
| * The url to normalize. |
| * |
| * @return The normalized url. |
| */ |
| private String normalizeUrl(String url) { |
| |
| if (!normalize) { |
| return url; |
| } |
| |
| String normalized = null; |
| if (urlNormalizers != null) { |
| try { |
| |
| // normalize and trim the url |
| normalized = urlNormalizers.normalize(url, |
| URLNormalizers.SCOPE_DEFAULT); |
| normalized = normalized.trim(); |
| } catch (Exception e) { |
| LOG.warn("Skipping " + url + ":" + e); |
| normalized = null; |
| } |
| } |
| return normalized; |
| } |
| |
| /** |
| * Filters the given url. |
| * |
| * @param url |
| * The url to filter. |
| * |
| * @return The filtered url or null. |
| */ |
| private String filterUrl(String url) { |
| |
| if (!filter) { |
| return url; |
| } |
| |
| try { |
| url = filters.filter(url); |
| } catch (Exception e) { |
| url = null; |
| } |
| |
| return url; |
| } |
| |
| /** |
| * Configures the OutlinkDb job mapper. Sets up internal links and link limiting. |
| */ |
| @Override |
| public void setup(Mapper<Text, Writable, Text, NutchWritable>.Context context) { |
| Configuration config = context.getConfiguration(); |
| conf = config; |
| |
| normalize = conf.getBoolean(URL_NORMALIZING, false); |
| filter = conf.getBoolean(URL_FILTERING, false); |
| |
| if (normalize) { |
| urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT); |
| } |
| |
| if (filter) { |
| filters = new URLFilters(conf); |
| } |
| } |
| |
| @Override |
| public void map(Text key, Writable value, |
| Context context) |
| throws IOException, InterruptedException { |
| |
| // normalize url, stop processing if null |
| String url = normalizeUrl(key.toString()); |
| if (url == null) { |
| return; |
| } |
| |
| // filter url |
| if (filterUrl(url) == null) { |
| return; |
| } |
| |
| // Overwrite the key with the normalized URL |
| key.set(url); |
| |
| if (value instanceof CrawlDatum) { |
| CrawlDatum datum = (CrawlDatum) value; |
| |
| if (datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP |
| || datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM |
| || datum.getStatus() == CrawlDatum.STATUS_FETCH_GONE) { |
| |
| // Tell the reducer to get rid of all instances of this key |
| context.write(key, new NutchWritable(new BooleanWritable(true))); |
| } |
| } else if (value instanceof ParseData) { |
| // get the parse data and the outlinks from the parse data, along with |
| // the fetch time for those links |
| ParseData data = (ParseData) value; |
| long fetchTime = getFetchTime(data); |
| Outlink[] outlinkAr = data.getOutlinks(); |
| Map<String, String> outlinkMap = new LinkedHashMap<>(); |
| |
| // normalize urls and put into map |
| if (outlinkAr != null && outlinkAr.length > 0) { |
| for (int i = 0; i < outlinkAr.length; i++) { |
| Outlink outlink = outlinkAr[i]; |
| String toUrl = normalizeUrl(outlink.getToUrl()); |
| |
| if (filterUrl(toUrl) == null) { |
| continue; |
| } |
| |
| // only put into map if the url doesn't already exist in the map or |
| // if it does and the anchor for that link is null, will replace if |
| // url is existing |
| boolean existingUrl = outlinkMap.containsKey(toUrl); |
| if (toUrl != null |
| && (!existingUrl || (existingUrl && outlinkMap.get(toUrl) == null))) { |
| outlinkMap.put(toUrl, outlink.getAnchor()); |
| } |
| } |
| } |
| |
| // collect the outlinks under the fetch time |
| for (String outlinkUrl : outlinkMap.keySet()) { |
| String anchor = outlinkMap.get(outlinkUrl); |
| LinkDatum datum = new LinkDatum(outlinkUrl, anchor, fetchTime); |
| context.write(key, new NutchWritable(datum)); |
| } |
| } else if (value instanceof LinkDatum) { |
| LinkDatum datum = (LinkDatum) value; |
| String linkDatumUrl = normalizeUrl(datum.getUrl()); |
| |
| if (filterUrl(linkDatumUrl) != null) { |
| datum.setUrl(linkDatumUrl); |
| |
| // collect existing outlinks from existing OutlinkDb |
| context.write(key, new NutchWritable(datum)); |
| } |
| } |
| } |
| } |
| |
| public static class OutlinkDbReducer extends |
| Reducer<Text, NutchWritable, Text, LinkDatum> { |
| |
| // ignoring internal domains, internal hosts |
| private boolean ignoreDomain = true; |
| private boolean ignoreHost = true; |
| |
| // limiting urls out to a page or to a domain |
| private boolean limitPages = true; |
| private boolean limitDomains = true; |
| |
| // url normalizers, filters and job configuration |
| private Configuration conf; |
| |
| /** |
| * Configures the OutlinkDb job reducer. Sets up internal links and link limiting. |
| */ |
| public void setup(Reducer<Text, NutchWritable, Text, LinkDatum>.Context context) { |
| Configuration config = context.getConfiguration(); |
| conf = config; |
| ignoreHost = conf.getBoolean("link.ignore.internal.host", true); |
| ignoreDomain = conf.getBoolean("link.ignore.internal.domain", true); |
| limitPages = conf.getBoolean("link.ignore.limit.page", true); |
| limitDomains = conf.getBoolean("link.ignore.limit.domain", true); |
| |
| } |
| |
| public void reduce(Text key, Iterable<NutchWritable> values, |
| Context context) |
| throws IOException, InterruptedException { |
| |
| // aggregate all outlinks, get the most recent timestamp for a fetch |
| // which should be the timestamp for all of the most recent outlinks |
| long mostRecent = 0L; |
| List<LinkDatum> outlinkList = new ArrayList<>(); |
| for (NutchWritable val : values) { |
| final Writable value = val.get(); |
| |
| if (value instanceof LinkDatum) { |
| // loop through, change out most recent timestamp if needed |
| LinkDatum next = (LinkDatum) value; |
| long timestamp = next.getTimestamp(); |
| if (mostRecent == 0L || mostRecent < timestamp) { |
| mostRecent = timestamp; |
| } |
| outlinkList.add(WritableUtils.clone(next, conf)); |
| context.getCounter("WebGraph.outlinks", "added links").increment(1); |
| } else if (value instanceof BooleanWritable) { |
| BooleanWritable delete = (BooleanWritable) value; |
| // Actually, delete is always true, otherwise we don't emit it in the |
| // mapper in the first place |
| if (delete.get() == true) { |
| // This page is gone, do not emit it's outlinks |
| context.getCounter("WebGraph.outlinks", "removed links").increment(1); |
| return; |
| } |
| } |
| } |
| |
| // get the url, domain, and host for the url |
| String url = key.toString(); |
| String domain = URLUtil.getDomainName(url); |
| String host = URLUtil.getHost(url); |
| |
| // setup checking sets for domains and pages |
| Set<String> domains = new HashSet<>(); |
| Set<String> pages = new HashSet<>(); |
| |
| // loop through the link datums |
| for (LinkDatum datum : outlinkList) { |
| |
| // get the url, host, domain, and page for each outlink |
| String toUrl = datum.getUrl(); |
| String toDomain = URLUtil.getDomainName(toUrl); |
| String toHost = URLUtil.getHost(toUrl); |
| String toPage = URLUtil.getPage(toUrl); |
| datum.setLinkType(LinkDatum.OUTLINK); |
| |
| // outlinks must be the most recent and conform to internal url and |
| // limiting rules, if it does collect it |
| if (datum.getTimestamp() == mostRecent |
| && (!limitPages || (limitPages && !pages.contains(toPage))) |
| && (!limitDomains || (limitDomains && !domains.contains(toDomain))) |
| && (!ignoreHost || (ignoreHost && !toHost.equalsIgnoreCase(host))) |
| && (!ignoreDomain || (ignoreDomain && !toDomain |
| .equalsIgnoreCase(domain)))) { |
| context.write(key, datum); |
| pages.add(toPage); |
| domains.add(toDomain); |
| } |
| } |
| } |
| } |
| |
| public void close() { |
| } |
| } |
| |
| /** |
| * The InlinkDb creates a database of Inlinks. Inlinks are inverted from the |
| * OutlinkDb LinkDatum objects and are regenerated each time the WebGraph is |
| * updated. |
| */ |
| private static class InlinkDb extends Configured{ |
| |
| private static long timestamp; |
| |
| /** |
| * Inverts the Outlink LinkDatum objects into new LinkDatum objects with a |
| * new system timestamp, type and to and from url switched. |
| */ |
| public static class InlinkDbMapper extends |
| Mapper<Text, LinkDatum, Text, LinkDatum> { |
| |
| /** |
| * Configures job mapper. Sets timestamp for all Inlink LinkDatum objects to the |
| * current system time. |
| */ |
| public void setup(Mapper<Text, LinkDatum, Text, LinkDatum>.Context context) { |
| timestamp = System.currentTimeMillis(); |
| } |
| |
| public void map(Text key, LinkDatum datum, |
| Context context) |
| throws IOException, InterruptedException { |
| |
| // get the to and from url and the anchor |
| String fromUrl = key.toString(); |
| String toUrl = datum.getUrl(); |
| String anchor = datum.getAnchor(); |
| |
| // flip the from and to url and set the new link type |
| LinkDatum inlink = new LinkDatum(fromUrl, anchor, timestamp); |
| inlink.setLinkType(LinkDatum.INLINK); |
| context.write(new Text(toUrl), inlink); |
| } |
| } |
| } |
| |
| /** |
| * Creates the Node database which consists of the number of in and outlinks |
| * for each url and a score slot for analysis programs such as LinkRank. |
| */ |
| private static class NodeDb extends Configured { |
| |
| /** |
| * Counts the number of inlinks and outlinks for each url and sets a default |
| * score of 0.0 for each url (node) in the webgraph. |
| */ |
| public static class NodeDbReducer extends |
| Reducer<Text, LinkDatum, Text, Node> { |
| |
| /** |
| * Configures job reducer. |
| */ |
| public void setup(Reducer<Text, LinkDatum, Text, Node>.Context context) { |
| } |
| |
| public void reduce(Text key, Iterable<LinkDatum> values, |
| Context context) throws IOException, InterruptedException { |
| |
| Node node = new Node(); |
| int numInlinks = 0; |
| int numOutlinks = 0; |
| |
| // loop through counting number of in and out links |
| for (LinkDatum next : values) { |
| if (next.getLinkType() == LinkDatum.INLINK) { |
| numInlinks++; |
| } else if (next.getLinkType() == LinkDatum.OUTLINK) { |
| numOutlinks++; |
| } |
| } |
| |
| // set the in and outlinks and a default score of 0 |
| node.setNumInlinks(numInlinks); |
| node.setNumOutlinks(numOutlinks); |
| node.setInlinkScore(0.0f); |
| context.write(key, node); |
| } |
| } |
| } |
| |
| /** |
| * Creates the three different WebGraph databases, Outlinks, Inlinks, and |
| * Node. If a current WebGraph exists then it is updated, if it doesn't exist |
| * then a new WebGraph database is created. |
| * |
| * @param webGraphDb |
| * The WebGraph to create or update. |
| * @param segments |
| * The array of segments used to update the WebGraph. Newer segments |
| * and fetch times will overwrite older segments. |
| * @param normalize |
| * whether to use URLNormalizers on URL's in the segment |
| * @param filter |
| * whether to use URLFilters on URL's in the segment |
| * |
| * @throws IOException |
| * If an error occurs while processing the WebGraph. |
| */ |
| public void createWebGraph(Path webGraphDb, Path[] segments, |
| boolean normalize, boolean filter) throws IOException, |
| InterruptedException, ClassNotFoundException { |
| |
| SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
| long start = System.currentTimeMillis(); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("WebGraphDb: starting at " + sdf.format(start)); |
| LOG.info("WebGraphDb: webgraphdb: " + webGraphDb); |
| LOG.info("WebGraphDb: URL normalize: " + normalize); |
| LOG.info("WebGraphDb: URL filter: " + filter); |
| } |
| |
| FileSystem fs = webGraphDb.getFileSystem(getConf()); |
| |
| // lock an existing webgraphdb to prevent multiple simultaneous updates |
| Path lock = new Path(webGraphDb, LOCK_NAME); |
| if (!fs.exists(webGraphDb)) { |
| fs.mkdirs(webGraphDb); |
| } |
| |
| LockUtil.createLockFile(fs, lock, false); |
| |
| // outlink and temp outlink database paths |
| Path outlinkDb = new Path(webGraphDb, OUTLINK_DIR); |
| Path oldOutlinkDb = new Path(webGraphDb, OLD_OUTLINK_DIR); |
| |
| if (!fs.exists(outlinkDb)) { |
| fs.mkdirs(outlinkDb); |
| } |
| |
| Path tempOutlinkDb = new Path(outlinkDb + "-" |
| + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); |
| Job outlinkJob = NutchJob.getInstance(getConf()); |
| Configuration outlinkJobConf = outlinkJob.getConfiguration(); |
| outlinkJob.setJobName("Outlinkdb: " + outlinkDb); |
| |
| boolean deleteGone = outlinkJobConf.getBoolean("link.delete.gone", false); |
| boolean preserveBackup = outlinkJobConf.getBoolean("db.preserve.backup", true); |
| |
| if (deleteGone) { |
| LOG.info("OutlinkDb: deleting gone links"); |
| } |
| |
| // get the parse data and crawl fetch data for all segments |
| if (segments != null) { |
| for (int i = 0; i < segments.length; i++) { |
| FileSystem sfs = segments[i].getFileSystem(outlinkJobConf); |
| Path parseData = new Path(segments[i], ParseData.DIR_NAME); |
| if (sfs.exists(parseData)) { |
| LOG.info("OutlinkDb: adding input: " + parseData); |
| FileInputFormat.addInputPath(outlinkJob, parseData); |
| } |
| |
| if (deleteGone) { |
| Path crawlFetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME); |
| if (sfs.exists(crawlFetch)) { |
| LOG.info("OutlinkDb: adding input: " + crawlFetch); |
| FileInputFormat.addInputPath(outlinkJob, crawlFetch); |
| } |
| } |
| } |
| } |
| |
| // add the existing webgraph |
| LOG.info("OutlinkDb: adding input: " + outlinkDb); |
| FileInputFormat.addInputPath(outlinkJob, outlinkDb); |
| |
| outlinkJobConf.setBoolean(OutlinkDb.URL_NORMALIZING, normalize); |
| outlinkJobConf.setBoolean(OutlinkDb.URL_FILTERING, filter); |
| |
| outlinkJob.setInputFormatClass(SequenceFileInputFormat.class); |
| outlinkJob.setJarByClass(OutlinkDb.class); |
| outlinkJob.setMapperClass(OutlinkDb.OutlinkDbMapper.class); |
| outlinkJob.setReducerClass(OutlinkDb.OutlinkDbReducer.class); |
| outlinkJob.setMapOutputKeyClass(Text.class); |
| outlinkJob.setMapOutputValueClass(NutchWritable.class); |
| outlinkJob.setOutputKeyClass(Text.class); |
| outlinkJob.setOutputValueClass(LinkDatum.class); |
| FileOutputFormat.setOutputPath(outlinkJob, tempOutlinkDb); |
| outlinkJob.setOutputFormatClass(MapFileOutputFormat.class); |
| outlinkJobConf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", |
| false); |
| |
| // run the outlinkdb job and replace any old outlinkdb with the new one |
| try { |
| LOG.info("OutlinkDb: running"); |
| boolean success = outlinkJob.waitForCompletion(true); |
| if (!success) { |
| String message = "OutlinkDb job did not succeed, job status:" |
| + outlinkJob.getStatus().getState() + ", reason: " |
| + outlinkJob.getStatus().getFailureInfo(); |
| LOG.error(message); |
| NutchJob.cleanupAfterFailure(tempOutlinkDb, lock, fs); |
| throw new RuntimeException(message); |
| } |
| LOG.info("OutlinkDb: installing " + outlinkDb); |
| FSUtils.replace(fs, oldOutlinkDb, outlinkDb, true); |
| FSUtils.replace(fs, outlinkDb, tempOutlinkDb, true); |
| if (!preserveBackup && fs.exists(oldOutlinkDb)) |
| fs.delete(oldOutlinkDb, true); |
| LOG.info("OutlinkDb: finished"); |
| } catch (IOException | InterruptedException | ClassNotFoundException e) { |
| LOG.error("OutlinkDb failed:", e); |
| // remove lock file and and temporary directory if an error occurs |
| NutchJob.cleanupAfterFailure(tempOutlinkDb, lock, fs); |
| throw e; |
| } |
| |
| // inlink and temp link database paths |
| Path inlinkDb = new Path(webGraphDb, INLINK_DIR); |
| Path tempInlinkDb = new Path(inlinkDb + "-" |
| + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); |
| |
| Job inlinkJob = NutchJob.getInstance(getConf()); |
| Configuration inlinkJobConf = inlinkJob.getConfiguration(); |
| inlinkJob.setJobName("Inlinkdb " + inlinkDb); |
| LOG.info("InlinkDb: adding input: " + outlinkDb); |
| FileInputFormat.addInputPath(inlinkJob, outlinkDb); |
| inlinkJob.setInputFormatClass(SequenceFileInputFormat.class); |
| inlinkJob.setJarByClass(InlinkDb.class); |
| inlinkJob.setMapperClass(InlinkDb.InlinkDbMapper.class); |
| inlinkJob.setMapOutputKeyClass(Text.class); |
| inlinkJob.setMapOutputValueClass(LinkDatum.class); |
| inlinkJob.setOutputKeyClass(Text.class); |
| inlinkJob.setOutputValueClass(LinkDatum.class); |
| FileOutputFormat.setOutputPath(inlinkJob, tempInlinkDb); |
| inlinkJob.setOutputFormatClass(MapFileOutputFormat.class); |
| inlinkJobConf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", |
| false); |
| |
| try { |
| |
| // run the inlink and replace any old with new |
| LOG.info("InlinkDb: running"); |
| boolean success = inlinkJob.waitForCompletion(true); |
| if (!success) { |
| String message = "InlinkDb job did not succeed, job status:" |
| + inlinkJob.getStatus().getState() + ", reason: " |
| + inlinkJob.getStatus().getFailureInfo(); |
| LOG.error(message); |
| NutchJob.cleanupAfterFailure(tempInlinkDb, lock, fs); |
| throw new RuntimeException(message); |
| } |
| LOG.info("InlinkDb: installing " + inlinkDb); |
| FSUtils.replace(fs, inlinkDb, tempInlinkDb, true); |
| LOG.info("InlinkDb: finished"); |
| } catch (IOException | InterruptedException | ClassNotFoundException e) { |
| LOG.error("InlinkDb failed:", e); |
| // remove lock file and and temporary directory if an error occurs |
| NutchJob.cleanupAfterFailure(tempInlinkDb, lock, fs); |
| throw e; |
| } |
| |
| // node and temp node database paths |
| Path nodeDb = new Path(webGraphDb, NODE_DIR); |
| Path tempNodeDb = new Path(nodeDb + "-" |
| + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); |
| |
| Job nodeJob = NutchJob.getInstance(getConf()); |
| Configuration nodeJobConf = nodeJob.getConfiguration(); |
| nodeJob.setJobName("NodeDb " + nodeDb); |
| LOG.info("NodeDb: adding input: " + outlinkDb); |
| LOG.info("NodeDb: adding input: " + inlinkDb); |
| FileInputFormat.addInputPath(nodeJob, outlinkDb); |
| FileInputFormat.addInputPath(nodeJob, inlinkDb); |
| nodeJob.setInputFormatClass(SequenceFileInputFormat.class); |
| nodeJob.setJarByClass(NodeDb.class); |
| nodeJob.setReducerClass(NodeDb.NodeDbReducer.class); |
| nodeJob.setMapOutputKeyClass(Text.class); |
| nodeJob.setMapOutputValueClass(LinkDatum.class); |
| nodeJob.setOutputKeyClass(Text.class); |
| nodeJob.setOutputValueClass(Node.class); |
| FileOutputFormat.setOutputPath(nodeJob, tempNodeDb); |
| nodeJob.setOutputFormatClass(MapFileOutputFormat.class); |
| nodeJobConf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", |
| false); |
| |
| try { |
| |
| // run the node job and replace old nodedb with new |
| LOG.info("NodeDb: running"); |
| boolean success = nodeJob.waitForCompletion(true); |
| if (!success) { |
| String message = "NodeDb job did not succeed, job status:" |
| + nodeJob.getStatus().getState() + ", reason: " |
| + nodeJob.getStatus().getFailureInfo(); |
| LOG.error(message); |
| // remove lock file and and temporary directory if an error occurs |
| NutchJob.cleanupAfterFailure(tempNodeDb, lock, fs); |
| throw new RuntimeException(message); |
| } |
| LOG.info("NodeDb: installing " + nodeDb); |
| FSUtils.replace(fs, nodeDb, tempNodeDb, true); |
| LOG.info("NodeDb: finished"); |
| } catch (IOException | InterruptedException | ClassNotFoundException e) { |
| LOG.error("NodeDb failed:", e); |
| // remove lock file and and temporary directory if an error occurs |
| NutchJob.cleanupAfterFailure(tempNodeDb, lock, fs); |
| throw e; |
| } |
| |
| // remove the lock file for the webgraph |
| LockUtil.removeLockFile(fs, lock); |
| |
| long end = System.currentTimeMillis(); |
| LOG.info("WebGraphDb: finished at " + sdf.format(end) + ", elapsed: " |
| + TimingUtil.elapsedTime(start, end)); |
| } |
| |
| public static void main(String[] args) throws Exception { |
| int res = ToolRunner.run(NutchConfiguration.create(), new WebGraph(), args); |
| System.exit(res); |
| } |
| |
| /** |
| * Parses command link arguments and runs the WebGraph jobs. |
| */ |
| public int run(String[] args) throws Exception { |
| |
| // boolean options |
| Option helpOpt = new Option("h", "help", false, "show this help message"); |
| Option normOpt = new Option("n", "normalize", false, |
| "whether to use URLNormalizers on the URL's in the segment"); |
| Option filtOpt = new Option("f", "filter", false, |
| "whether to use URLFilters on the URL's in the segment"); |
| |
| // argument options |
| @SuppressWarnings("static-access") |
| Option graphOpt = OptionBuilder |
| .withArgName("webgraphdb") |
| .hasArg() |
| .withDescription( |
| "the web graph database to create (if none exists) or use if one does") |
| .create("webgraphdb"); |
| @SuppressWarnings("static-access") |
| Option segOpt = OptionBuilder.withArgName("segment").hasArgs() |
| .withDescription("the segment(s) to use").create("segment"); |
| @SuppressWarnings("static-access") |
| Option segDirOpt = OptionBuilder.withArgName("segmentDir").hasArgs() |
| .withDescription("the segment directory to use").create("segmentDir"); |
| |
| // create the options |
| Options options = new Options(); |
| options.addOption(helpOpt); |
| options.addOption(normOpt); |
| options.addOption(filtOpt); |
| options.addOption(graphOpt); |
| options.addOption(segOpt); |
| options.addOption(segDirOpt); |
| |
| CommandLineParser parser = new GnuParser(); |
| try { |
| CommandLine line = parser.parse(options, args); |
| if (line.hasOption("help") || !line.hasOption("webgraphdb") |
| || (!line.hasOption("segment") && !line.hasOption("segmentDir"))) { |
| HelpFormatter formatter = new HelpFormatter(); |
| formatter.printHelp("WebGraph", options, true); |
| return -1; |
| } |
| |
| String webGraphDb = line.getOptionValue("webgraphdb"); |
| |
| Path[] segPaths = null; |
| |
| // Handle segment option |
| if (line.hasOption("segment")) { |
| String[] segments = line.getOptionValues("segment"); |
| segPaths = new Path[segments.length]; |
| for (int i = 0; i < segments.length; i++) { |
| segPaths[i] = new Path(segments[i]); |
| } |
| } |
| |
| // Handle segmentDir option |
| if (line.hasOption("segmentDir")) { |
| Path dir = new Path(line.getOptionValue("segmentDir")); |
| FileSystem fs = dir.getFileSystem(getConf()); |
| FileStatus[] fstats = fs.listStatus(dir, |
| HadoopFSUtil.getPassDirectoriesFilter(fs)); |
| segPaths = HadoopFSUtil.getPaths(fstats); |
| } |
| |
| boolean normalize = false; |
| |
| if (line.hasOption("normalize")) { |
| normalize = true; |
| } |
| |
| boolean filter = false; |
| |
| if (line.hasOption("filter")) { |
| filter = true; |
| } |
| |
| createWebGraph(new Path(webGraphDb), segPaths, normalize, filter); |
| return 0; |
| } catch (Exception e) { |
| LOG.error("WebGraph: " + StringUtils.stringifyException(e)); |
| return -2; |
| } |
| } |
| |
| } |