| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nutch.scoring.webgraph; |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.text.SimpleDateFormat; |
| import java.util.Random; |
| |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.CommandLineParser; |
| import org.apache.commons.cli.GnuParser; |
| import org.apache.commons.cli.HelpFormatter; |
| import org.apache.commons.cli.Option; |
| import org.apache.commons.cli.OptionBuilder; |
| import org.apache.commons.cli.Options; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.Configured; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.ObjectWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; |
| import org.apache.hadoop.mapreduce.Mapper; |
| import org.apache.hadoop.mapreduce.Reducer; |
| import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.hadoop.util.ToolRunner; |
| import org.apache.nutch.crawl.CrawlDatum; |
| import org.apache.nutch.crawl.CrawlDb; |
| import org.apache.nutch.util.NutchConfiguration; |
| import org.apache.nutch.util.NutchJob; |
| import org.apache.nutch.util.TimingUtil; |
| |
| /** |
| * Updates the score from the WebGraph node database into the crawl database. |
| * Any score that is not in the node database is set to the clear score in the |
| * crawl database. |
| */ |
| public class ScoreUpdater extends Configured implements Tool{ |
| |
| private static final Logger LOG = LoggerFactory |
| .getLogger(MethodHandles.lookup().lookupClass()); |
| |
| /** |
| * Changes input into ObjectWritables. |
| */ |
| public static class ScoreUpdaterMapper extends |
| Mapper<Text, Writable, Text, ObjectWritable> { |
| |
| @Override |
| public void map(Text key, Writable value, |
| Context context) |
| throws IOException, InterruptedException { |
| |
| ObjectWritable objWrite = new ObjectWritable(); |
| objWrite.set(value); |
| context.write(key, objWrite); |
| } |
| } |
| |
| /** |
| * Creates new CrawlDatum objects with the updated score from the NodeDb or |
| * with a cleared score. |
| */ |
| public static class ScoreUpdaterReducer extends |
| Reducer<Text, ObjectWritable, Text, CrawlDatum> { |
| private float clearScore = 0.0f; |
| |
| @Override |
| public void setup(Reducer<Text, ObjectWritable, Text, CrawlDatum>.Context context) { |
| Configuration conf = context.getConfiguration(); |
| clearScore = conf.getFloat("link.score.updater.clear.score", 0.0f); |
| } |
| |
| @Override |
| public void reduce(Text key, Iterable<ObjectWritable> values, |
| Context context) |
| throws IOException, InterruptedException { |
| |
| String url = key.toString(); |
| Node node = null; |
| CrawlDatum datum = null; |
| |
| // set the node and the crawl datum, should be one of each unless no node |
| // for url in the crawldb |
| for (ObjectWritable next : values) { |
| Object value = next.get(); |
| if (value instanceof Node) { |
| node = (Node) value; |
| } else if (value instanceof CrawlDatum) { |
| datum = (CrawlDatum) value; |
| } |
| } |
| |
| // datum should never be null, could happen if somehow the url was |
| // normalized or changed after being pulled from the crawldb |
| if (datum != null) { |
| |
| if (node != null) { |
| |
| // set the inlink score in the nodedb |
| float inlinkScore = node.getInlinkScore(); |
| datum.setScore(inlinkScore); |
| LOG.debug(url + ": setting to score " + inlinkScore); |
| } else { |
| |
| // clear out the score in the crawldb |
| datum.setScore(clearScore); |
| LOG.debug(url + ": setting to clear score of " + clearScore); |
| } |
| |
| context.write(key, datum); |
| } else { |
| LOG.debug(url + ": no datum"); |
| } |
| } |
| } |
| |
| |
| /** |
| * Updates the inlink score in the web graph node databsae into the crawl |
| * database. |
| * |
| * @param crawlDb |
| * The crawl database to update |
| * @param webGraphDb |
| * The webgraph database to use. |
| * |
| * @throws IOException |
| * If an error occurs while updating the scores. |
| */ |
| public void update(Path crawlDb, Path webGraphDb) throws IOException, |
| ClassNotFoundException, InterruptedException { |
| |
| SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
| long start = System.currentTimeMillis(); |
| LOG.info("ScoreUpdater: starting at " + sdf.format(start)); |
| |
| Configuration conf = getConf(); |
| |
| // create a temporary crawldb with the new scores |
| LOG.info("Running crawldb update " + crawlDb); |
| Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR); |
| Path crawlDbCurrent = new Path(crawlDb, CrawlDb.CURRENT_NAME); |
| Path newCrawlDb = new Path(crawlDb, Integer.toString(new Random() |
| .nextInt(Integer.MAX_VALUE))); |
| |
| // run the updater job outputting to the temp crawl database |
| Job updater = NutchJob.getInstance(conf); |
| updater.setJobName("Update CrawlDb from WebGraph"); |
| FileInputFormat.addInputPath(updater, crawlDbCurrent); |
| FileInputFormat.addInputPath(updater, nodeDb); |
| FileOutputFormat.setOutputPath(updater, newCrawlDb); |
| updater.setInputFormatClass(SequenceFileInputFormat.class); |
| updater.setJarByClass(ScoreUpdater.class); |
| updater.setMapperClass(ScoreUpdater.ScoreUpdaterMapper.class); |
| updater.setReducerClass(ScoreUpdater.ScoreUpdaterReducer.class); |
| updater.setMapOutputKeyClass(Text.class); |
| updater.setMapOutputValueClass(ObjectWritable.class); |
| updater.setOutputKeyClass(Text.class); |
| updater.setOutputValueClass(CrawlDatum.class); |
| updater.setOutputFormatClass(MapFileOutputFormat.class); |
| |
| try { |
| boolean success = updater.waitForCompletion(true); |
| if (!success) { |
| String message = "Update CrawlDb from WebGraph job did not succeed, job status:" |
| + updater.getStatus().getState() + ", reason: " |
| + updater.getStatus().getFailureInfo(); |
| LOG.error(message); |
| // remove the temp crawldb on error |
| FileSystem fs = newCrawlDb.getFileSystem(conf); |
| if (fs.exists(newCrawlDb)) { |
| fs.delete(newCrawlDb, true); |
| } |
| throw new RuntimeException(message); |
| } |
| } catch (IOException | ClassNotFoundException | InterruptedException e) { |
| LOG.error("Update CrawlDb from WebGraph:", e); |
| |
| // remove the temp crawldb on error |
| FileSystem fs = newCrawlDb.getFileSystem(conf); |
| if (fs.exists(newCrawlDb)) { |
| fs.delete(newCrawlDb, true); |
| } |
| throw e; |
| } |
| |
| // install the temp crawl database |
| LOG.info("ScoreUpdater: installing new crawldb " + crawlDb); |
| CrawlDb.install(updater, crawlDb); |
| |
| long end = System.currentTimeMillis(); |
| LOG.info("ScoreUpdater: finished at " + sdf.format(end) + ", elapsed: " |
| + TimingUtil.elapsedTime(start, end)); |
| } |
| |
| public static void main(String[] args) throws Exception { |
| int res = ToolRunner.run(NutchConfiguration.create(), new ScoreUpdater(), |
| args); |
| System.exit(res); |
| } |
| |
| /** |
| * Runs the ScoreUpdater tool. |
| */ |
| public int run(String[] args) throws Exception { |
| |
| Options options = new Options(); |
| OptionBuilder.withArgName("help"); |
| OptionBuilder.withDescription("show this help message"); |
| Option helpOpts = OptionBuilder.create("help"); |
| options.addOption(helpOpts); |
| |
| OptionBuilder.withArgName("crawldb"); |
| OptionBuilder.hasArg(); |
| OptionBuilder.withDescription("the crawldb to use"); |
| Option crawlDbOpts = OptionBuilder.create("crawldb"); |
| options.addOption(crawlDbOpts); |
| |
| OptionBuilder.withArgName("webgraphdb"); |
| OptionBuilder.hasArg(); |
| OptionBuilder.withDescription("the webgraphdb to use"); |
| Option webGraphOpts = OptionBuilder.create("webgraphdb"); |
| options.addOption(webGraphOpts); |
| |
| CommandLineParser parser = new GnuParser(); |
| try { |
| |
| CommandLine line = parser.parse(options, args); |
| if (line.hasOption("help") || !line.hasOption("webgraphdb") |
| || !line.hasOption("crawldb")) { |
| HelpFormatter formatter = new HelpFormatter(); |
| formatter.printHelp("ScoreUpdater", options); |
| return -1; |
| } |
| |
| String crawlDb = line.getOptionValue("crawldb"); |
| String webGraphDb = line.getOptionValue("webgraphdb"); |
| update(new Path(crawlDb), new Path(webGraphDb)); |
| return 0; |
| } catch (Exception e) { |
| LOG.error("ScoreUpdater: " + StringUtils.stringifyException(e)); |
| return -1; |
| } |
| } |
| } |