blob: cc93eb88a413c2418608817eb5b27ce36568d264 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.scoring.webgraph;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.text.SimpleDateFormat;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.TimingUtil;
import org.apache.nutch.util.URLUtil;
/**
* A tools that dumps out the top urls by number of inlinks, number of outlinks,
* or by score, to a text file. One of the major uses of this tool is to check
* the top scoring urls of a link analysis program such as LinkRank.
*
* For number of inlinks or number of outlinks the WebGraph program will need to
* have been run. For link analysis score a program such as LinkRank will need
* to have been run which updates the NodeDb of the WebGraph.
*/
public class NodeDumper extends Configured implements Tool {
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
private static enum DumpType {
INLINKS, OUTLINKS, SCORES
}
private static enum AggrType {
SUM, MAX
}
private static enum NameType {
HOST, DOMAIN
}
/**
* Outputs the top urls sorted in descending order. Depending on the flag set
* on the command line, the top urls could be for number of inlinks, for
* number of outlinks, or for link analysis score.
*/
public static class Sorter extends Configured {
/**
* Outputs the url with the appropriate number of inlinks, outlinks, or for
* score.
*/
public static class SorterMapper extends
Mapper<Text, Node, FloatWritable, Text> {
private Configuration conf;
private boolean inlinks = false;
private boolean outlinks = false;
/**
* Configures the mapper, sets the flag for type of content and the topN number
* if any.
*/
@Override
public void setup(Mapper<Text, Node, FloatWritable, Text>.Context context) {
conf = context.getConfiguration();
inlinks = conf.getBoolean("inlinks", false);
outlinks = conf.getBoolean("outlinks", false);
}
@Override
public void map(Text key, Node node,
Context context) throws IOException, InterruptedException {
float number = 0;
if (inlinks) {
number = node.getNumInlinks();
} else if (outlinks) {
number = node.getNumOutlinks();
} else {
number = node.getInlinkScore();
}
// number collected with negative to be descending
context.write(new FloatWritable(-number), key);
}
}
/**
* Flips and collects the url and numeric sort value.
*/
public static class SorterReducer extends
Reducer<FloatWritable, Text, Text, FloatWritable> {
private Configuration conf;
private long topn = Long.MAX_VALUE;
/**
* Configures the reducer, sets the flag for type of content and the topN number
* if any.
*/
@Override
public void setup(Reducer<FloatWritable, Text, Text, FloatWritable>.Context context) {
conf = context.getConfiguration();
topn = conf.getLong("topn", Long.MAX_VALUE);
}
@Override
public void reduce(FloatWritable key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
// take the negative of the negative to get original value, sometimes 0
// value are a little weird
float val = key.get();
FloatWritable number = new FloatWritable(val == 0 ? 0 : -val);
long numCollected = 0;
// collect all values, this time with the url as key
for (Text value : values) {
if (numCollected < topn) {
Text url = WritableUtils.clone(value, conf);
context.write(url, number);
numCollected++;
}
}
}
}
}
/**
* Outputs the hosts or domains with an associated value. This value consists
* of either the number of inlinks, the number of outlinks or the score. The
* computed value is then either the sum of all parts or the top value.
*/
public static class Dumper extends Configured {
/**
* Outputs the host or domain as key for this record and numInlinks,
* numOutlinks or score as the value.
*/
public static class DumperMapper extends
Mapper<Text, Node, Text, FloatWritable> {
private Configuration conf;
private boolean inlinks = false;
private boolean outlinks = false;
private boolean host = false;
@Override
public void setup(Mapper<Text, Node, Text, FloatWritable>.Context context) {
conf = context.getConfiguration();
inlinks = conf.getBoolean("inlinks", false);
outlinks = conf.getBoolean("outlinks", false);
host = conf.getBoolean("host", false);
}
@Override
public void map(Text key, Node node,
Context context) throws IOException, InterruptedException {
float number = 0;
if (inlinks) {
number = node.getNumInlinks();
} else if (outlinks) {
number = node.getNumOutlinks();
} else {
number = node.getInlinkScore();
}
if (host) {
key.set(URLUtil.getHost(key.toString()));
} else {
key.set(URLUtil.getDomainName(key.toString()));
}
context.write(key, new FloatWritable(number));
}
}
/**
* Outputs either the sum or the top value for this record.
*/
public static class DumperReducer extends
Reducer<Text, FloatWritable, Text, FloatWritable> {
private Configuration conf;
private long topn = Long.MAX_VALUE;
private boolean sum = false;
@Override
public void reduce(Text key, Iterable<FloatWritable> values,
Context context) throws IOException, InterruptedException {
long numCollected = 0;
float sumOrMax = 0;
float val = 0;
// collect all values, this time with the url as key
for (FloatWritable value : values) {
if (numCollected < topn) {
val = value.get();
if (sum) {
sumOrMax += val;
} else {
if (sumOrMax < val) {
sumOrMax = val;
}
}
numCollected++;
} else {
break;
}
}
context.write(key, new FloatWritable(sumOrMax));
}
@Override
public void setup(Reducer<Text, FloatWritable, Text, FloatWritable>.Context context) {
conf = context.getConfiguration();
topn = conf.getLong("topn", Long.MAX_VALUE);
sum = conf.getBoolean("sum", false);
}
}
}
/**
* Runs the process to dump the top urls out to a text file.
*
* @param webGraphDb
* The WebGraph from which to pull values.
*
* @param topN
* @param output
*
* @throws IOException
* If an error occurs while dumping the top values.
*/
public void dumpNodes(Path webGraphDb, DumpType type, long topN, Path output,
boolean asEff, NameType nameType, AggrType aggrType,
boolean asSequenceFile) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("NodeDumper: starting at " + sdf.format(start));
Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
Job dumper = NutchJob.getInstance(getConf());
Configuration conf = dumper.getConfiguration();
dumper.setJobName("NodeDumper: " + webGraphDb);
FileInputFormat.addInputPath(dumper, nodeDb);
dumper.setInputFormatClass(SequenceFileInputFormat.class);
if (nameType == null) {
dumper.setJarByClass(Sorter.class);
dumper.setMapperClass(Sorter.SorterMapper.class);
dumper.setReducerClass(Sorter.SorterReducer.class);
dumper.setMapOutputKeyClass(FloatWritable.class);
dumper.setMapOutputValueClass(Text.class);
} else {
dumper.setJarByClass(Dumper.class);
dumper.setMapperClass(Dumper.DumperMapper.class);
dumper.setReducerClass(Dumper.DumperReducer.class);
dumper.setMapOutputKeyClass(Text.class);
dumper.setMapOutputValueClass(FloatWritable.class);
}
dumper.setOutputKeyClass(Text.class);
dumper.setOutputValueClass(FloatWritable.class);
FileOutputFormat.setOutputPath(dumper, output);
if (asSequenceFile) {
dumper.setOutputFormatClass(SequenceFileOutputFormat.class);
} else {
dumper.setOutputFormatClass(TextOutputFormat.class);
}
dumper.setNumReduceTasks(1);
conf.setBoolean("inlinks", type == DumpType.INLINKS);
conf.setBoolean("outlinks", type == DumpType.OUTLINKS);
conf.setBoolean("scores", type == DumpType.SCORES);
conf.setBoolean("host", nameType == NameType.HOST);
conf.setBoolean("domain", nameType == NameType.DOMAIN);
conf.setBoolean("sum", aggrType == AggrType.SUM);
conf.setBoolean("max", aggrType == AggrType.MAX);
conf.setLong("topn", topN);
// Set equals-sign as separator for Solr's ExternalFileField
if (asEff) {
conf.set("mapreduce.output.textoutputformat.separator", "=");
}
try {
LOG.info("NodeDumper: running");
boolean success = dumper.waitForCompletion(true);
if (!success) {
String message = "NodeDumper job did not succeed, job status:"
+ dumper.getStatus().getState() + ", reason: "
+ dumper.getStatus().getFailureInfo();
LOG.error(message);
throw new RuntimeException(message);
}
} catch (IOException e) {
LOG.error("NodeDumper job failed:", e);
throw e;
}
long end = System.currentTimeMillis();
LOG.info("NodeDumper: finished at " + sdf.format(end) + ", elapsed: "
+ TimingUtil.elapsedTime(start, end));
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(NutchConfiguration.create(), new NodeDumper(),
args);
System.exit(res);
}
/**
* Runs the node dumper tool.
*/
public int run(String[] args) throws Exception {
Options options = new Options();
OptionBuilder.withArgName("help");
OptionBuilder.withDescription("show this help message");
Option helpOpts = OptionBuilder.create("help");
options.addOption(helpOpts);
OptionBuilder.withArgName("webgraphdb");
OptionBuilder.hasArg();
OptionBuilder.withDescription("the web graph database to use");
Option webGraphDbOpts = OptionBuilder.create("webgraphdb");
options.addOption(webGraphDbOpts);
OptionBuilder.withArgName("inlinks");
OptionBuilder.withDescription("show highest inlinks");
Option inlinkOpts = OptionBuilder.create("inlinks");
options.addOption(inlinkOpts);
OptionBuilder.withArgName("outlinks");
OptionBuilder.withDescription("show highest outlinks");
Option outlinkOpts = OptionBuilder.create("outlinks");
options.addOption(outlinkOpts);
OptionBuilder.withArgName("scores");
OptionBuilder.withDescription("show highest scores");
Option scoreOpts = OptionBuilder.create("scores");
options.addOption(scoreOpts);
OptionBuilder.withArgName("topn");
OptionBuilder.hasOptionalArg();
OptionBuilder.withDescription("show topN scores");
Option topNOpts = OptionBuilder.create("topn");
options.addOption(topNOpts);
OptionBuilder.withArgName("output");
OptionBuilder.hasArg();
OptionBuilder.withDescription("the output directory to use");
Option outputOpts = OptionBuilder.create("output");
options.addOption(outputOpts);
OptionBuilder.withArgName("asEff");
OptionBuilder
.withDescription("Solr ExternalFileField compatible output format");
Option effOpts = OptionBuilder.create("asEff");
options.addOption(effOpts);
OptionBuilder.hasArgs(2);
OptionBuilder.withDescription("group <host|domain> <sum|max>");
Option groupOpts = OptionBuilder.create("group");
options.addOption(groupOpts);
OptionBuilder.withArgName("asSequenceFile");
OptionBuilder.withDescription("whether to output as a sequencefile");
Option sequenceFileOpts = OptionBuilder.create("asSequenceFile");
options.addOption(sequenceFileOpts);
CommandLineParser parser = new GnuParser();
try {
CommandLine line = parser.parse(options, args);
if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("NodeDumper", options);
return -1;
}
String webGraphDb = line.getOptionValue("webgraphdb");
boolean inlinks = line.hasOption("inlinks");
boolean outlinks = line.hasOption("outlinks");
long topN = (line.hasOption("topn") ? Long.parseLong(line
.getOptionValue("topn")) : Long.MAX_VALUE);
// get the correct dump type
String output = line.getOptionValue("output");
DumpType type = (inlinks ? DumpType.INLINKS
: outlinks ? DumpType.OUTLINKS : DumpType.SCORES);
NameType nameType = null;
AggrType aggrType = null;
String[] group = line.getOptionValues("group");
if (group != null && group.length == 2) {
nameType = (group[0].equals("host") ? NameType.HOST : group[0]
.equals("domain") ? NameType.DOMAIN : null);
aggrType = (group[1].equals("sum") ? AggrType.SUM : group[1]
.equals("sum") ? AggrType.MAX : null);
}
// Use ExternalFileField?
boolean asEff = line.hasOption("asEff");
boolean asSequenceFile = line.hasOption("asSequenceFile");
dumpNodes(new Path(webGraphDb), type, topN, new Path(output), asEff,
nameType, aggrType, asSequenceFile);
return 0;
} catch (Exception e) {
LOG.error("NodeDumper: " + StringUtils.stringifyException(e));
return -2;
}
}
}