blob: 97207546e005fdb9452e37db5762adfaa19d5a2b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.scoring.webgraph;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.invoke.MethodHandles;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.util.FSUtils;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.TimingUtil;
import org.apache.nutch.util.URLUtil;
public class LinkRank extends Configured implements Tool {
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
private static final String NUM_NODES = "_num_nodes_";
/**
* Runs the counter job. The counter job determines the number of links in the
* webgraph. This is used during analysis.
*
* @param fs
* The job file system.
* @param webGraphDb
* The web graph database to use.
*
* @return The number of nodes in the web graph.
* @throws IOException
* If an error occurs while running the counter job.
*/
private int runCounter(FileSystem fs, Path webGraphDb) throws IOException,
ClassNotFoundException, InterruptedException {
// configure the counter job
Path numLinksPath = new Path(webGraphDb, NUM_NODES);
Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
Job counter = NutchJob.getInstance(getConf());
Configuration conf = counter.getConfiguration();
counter.setJobName("LinkRank Counter");
FileInputFormat.addInputPath(counter, nodeDb);
FileOutputFormat.setOutputPath(counter, numLinksPath);
counter.setInputFormatClass(SequenceFileInputFormat.class);
counter.setJarByClass(Counter.class);
counter.setMapperClass(Counter.CountMapper.class);
counter.setCombinerClass(Counter.CountReducer.class);
counter.setReducerClass(Counter.CountReducer.class);
counter.setMapOutputKeyClass(Text.class);
counter.setMapOutputValueClass(LongWritable.class);
counter.setOutputKeyClass(Text.class);
counter.setOutputValueClass(LongWritable.class);
counter.setNumReduceTasks(1);
counter.setOutputFormatClass(TextOutputFormat.class);
conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
// run the counter job, outputs to a single reduce task and file
LOG.info("Starting link counter job");
try {
boolean success = counter.waitForCompletion(true);
if (!success) {
String message = "Link counter job did not succeed, job status:"
+ counter.getStatus().getState() + ", reason: "
+ counter.getStatus().getFailureInfo();
LOG.error(message);
throw new RuntimeException(message);
}
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("Link counter job failed:", e);
throw e;
}
LOG.info("Finished link counter job");
// read the first (and only) line from the file which should be the
// number of links in the web graph
FileStatus[] numLinksFiles = fs.listStatus(numLinksPath);
if (numLinksFiles.length == 0) {
throw new IOException("Failed to read numlinks temp file: "
+ " no file found in " + numLinksPath);
} else if (numLinksFiles.length > 1) {
throw new IOException("Failed to read numlinks temp file: "
+ " expected only one file but found " + numLinksFiles.length
+ " files in folder " + numLinksPath);
}
Path numLinksFile = numLinksFiles[0].getPath();
LOG.info("Reading numlinks temp file {}", numLinksFile);
FSDataInputStream readLinks = fs.open(numLinksFile);
CompressionCodecFactory cf = new CompressionCodecFactory(conf);
CompressionCodec codec = cf.getCodec(numLinksFiles[0].getPath());
InputStream streamLinks;
if (codec == null) {
LOG.debug("No compression codec found for {}, trying uncompressed",
numLinksFile);
streamLinks = readLinks;
} else {
LOG.info("Compression codec of numlinks temp file: {}",
codec.getDefaultExtension());
readLinks.seek(0);
streamLinks = codec.createInputStream(readLinks);
}
BufferedReader buffer = new BufferedReader(
new InputStreamReader(streamLinks));
String numLinksLine = buffer.readLine();
readLinks.close();
// check if there are links to process, if none, webgraph might be empty
if (numLinksLine == null || numLinksLine.length() == 0) {
LOG.error(
"Failed to determine number of links because of empty line in input {}",
numLinksFile);
fs.delete(numLinksPath, true);
throw new IOException("No links to process, is the webgraph empty?");
}
// delete temp file and convert and return the number of links as an int
LOG.info("Deleting numlinks temp file");
fs.delete(numLinksPath, true);
String numLinks = numLinksLine.split("\\s+")[1];
return Integer.parseInt(numLinks);
}
/**
* Runs the initializer job. The initializer job sets up the nodes with a
* default starting score for link analysis.
*
* @param nodeDb
* The node database to use.
* @param output
* The job output directory.
*
* @throws IOException
* If an error occurs while running the initializer job.
*/
private void runInitializer(Path nodeDb, Path output) throws IOException,
InterruptedException, ClassNotFoundException {
// configure the initializer
Job initializer = NutchJob.getInstance(getConf());
Configuration conf = initializer.getConfiguration();
initializer.setJobName("LinkAnalysis Initializer");
FileInputFormat.addInputPath(initializer, nodeDb);
FileOutputFormat.setOutputPath(initializer, output);
initializer.setJarByClass(Initializer.class);
initializer.setInputFormatClass(SequenceFileInputFormat.class);
initializer.setMapperClass(Initializer.class);
initializer.setMapOutputKeyClass(Text.class);
initializer.setMapOutputValueClass(Node.class);
initializer.setOutputKeyClass(Text.class);
initializer.setOutputValueClass(Node.class);
initializer.setOutputFormatClass(MapFileOutputFormat.class);
conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
// run the initializer
LOG.info("Starting initialization job");
try {
boolean success = initializer.waitForCompletion(true);
if (!success) {
String message = "Initialization job did not succeed, job status:"
+ initializer.getStatus().getState() + ", reason: "
+ initializer.getStatus().getFailureInfo();
LOG.error(message);
throw new RuntimeException(message);
}
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("Initialization job failed:", e);
throw e;
}
LOG.info("Finished initialization job.");
}
/**
* Runs the inverter job. The inverter job flips outlinks to inlinks to be
* passed into the analysis job.
*
* @param nodeDb
* The node database to use.
* @param outlinkDb
* The outlink database to use.
* @param output
* The output directory.
*
* @throws IOException
* If an error occurs while running the inverter job.
*/
private void runInverter(Path nodeDb, Path outlinkDb, Path output)
throws IOException, InterruptedException, ClassNotFoundException {
// configure the inverter
Job inverter = NutchJob.getInstance(getConf());
Configuration conf = inverter.getConfiguration();
inverter.setJobName("LinkAnalysis Inverter");
FileInputFormat.addInputPath(inverter, nodeDb);
FileInputFormat.addInputPath(inverter, outlinkDb);
FileOutputFormat.setOutputPath(inverter, output);
inverter.setInputFormatClass(SequenceFileInputFormat.class);
inverter.setJarByClass(Inverter.class);
inverter.setMapperClass(Inverter.InvertMapper.class);
inverter.setReducerClass(Inverter.InvertReducer.class);
inverter.setMapOutputKeyClass(Text.class);
inverter.setMapOutputValueClass(ObjectWritable.class);
inverter.setOutputKeyClass(Text.class);
inverter.setOutputValueClass(LinkDatum.class);
inverter.setOutputFormatClass(SequenceFileOutputFormat.class);
conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
// run the inverter job
LOG.info("Starting inverter job");
try {
boolean success = inverter.waitForCompletion(true);
if (!success) {
String message = "Inverter job did not succeed, job status:"
+ inverter.getStatus().getState() + ", reason: "
+ inverter.getStatus().getFailureInfo();
LOG.error(message);
throw new RuntimeException(message);
}
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("Inverter job failed:", e);
throw e;
}
LOG.info("Finished inverter job.");
}
/**
* Runs the link analysis job. The link analysis job applies the link rank
* formula to create a score per url and stores that score in the NodeDb.
*
* Typically the link analysis job is run a number of times to allow the link
* rank scores to converge.
*
* @param nodeDb
* The node database from which we are getting previous link rank
* scores.
* @param inverted
* The inverted inlinks
* @param output
* The link analysis output.
* @param iteration
* The current iteration number.
* @param numIterations
* The total number of link analysis iterations
*
* @throws IOException
* If an error occurs during link analysis.
*/
private void runAnalysis(Path nodeDb, Path inverted, Path output,
int iteration, int numIterations, float rankOne)
throws IOException, InterruptedException, ClassNotFoundException {
Job analyzer = NutchJob.getInstance(getConf());
Configuration conf = analyzer.getConfiguration();
conf.set("link.analyze.iteration", String.valueOf(iteration + 1));
analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (iteration + 1)
+ " of " + numIterations);
FileInputFormat.addInputPath(analyzer, nodeDb);
FileInputFormat.addInputPath(analyzer, inverted);
FileOutputFormat.setOutputPath(analyzer, output);
conf.set("link.analyze.rank.one", String.valueOf(rankOne));
analyzer.setMapOutputKeyClass(Text.class);
analyzer.setMapOutputValueClass(ObjectWritable.class);
analyzer.setInputFormatClass(SequenceFileInputFormat.class);
analyzer.setJarByClass(Analyzer.class);
analyzer.setMapperClass(Analyzer.AnalyzerMapper.class);
analyzer.setReducerClass(Analyzer.AnalyzerReducer.class);
analyzer.setOutputKeyClass(Text.class);
analyzer.setOutputValueClass(Node.class);
analyzer.setOutputFormatClass(MapFileOutputFormat.class);
conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
LOG.info("Starting analysis job");
try {
boolean success = analyzer.waitForCompletion(true);
if (!success) {
String message = "Analysis job did not succeed, job status:"
+ analyzer.getStatus().getState() + ", reason: "
+ analyzer.getStatus().getFailureInfo();
LOG.error(message);
throw new RuntimeException(message);
}
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("Analysis job failed:", e);
throw e;
}
LOG.info("Finished analysis job.");
}
/**
* The Counter job that determines the total number of nodes in the WebGraph.
* This is used to determine a rank one score for pages with zero inlinks but
* that contain outlinks.
*/
private static class Counter {
private static Text numNodes = new Text(NUM_NODES);
private static LongWritable one = new LongWritable(1L);
/**
* Outputs one for every node.
*/
public static class CountMapper extends
Mapper<Text, Node, Text, LongWritable> {
@Override
public void map(Text key, Node value,
Context context)
throws IOException, InterruptedException {
context.write(numNodes, one);
}
}
/**
* Totals the node number and outputs a single total value.
*/
public static class CountReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text key, Iterable<LongWritable> values,
Context context)
throws IOException, InterruptedException {
long total = 0;
for (LongWritable val : values) {
total += val.get();
}
context.write(numNodes, new LongWritable(total));
}
}
}
private static class Initializer extends Mapper<Text, Node, Text, Node> {
private Configuration conf;
private float initialScore = 1.0f;
@Override
public void setup(Mapper<Text, Node, Text, Node>.Context context) {
conf = context.getConfiguration();
initialScore = conf.getFloat("link.analyze.initial.score", 1.0f);
}
@Override
public void map(Text key, Node node, Context context)
throws IOException, InterruptedException {
String url = key.toString();
Node outNode = WritableUtils.clone(node, conf);
outNode.setInlinkScore(initialScore);
context.write(new Text(url), outNode);
}
}
/**
* Inverts outlinks and attaches current score from the NodeDb of the
* WebGraph. The link analysis process consists of inverting, analyzing and
* scoring, in a loop for a given number of iterations.
*/
private static class Inverter {
/**
* Convert values to ObjectWritable
*/
public static class InvertMapper extends
Mapper<Text, Writable, Text, ObjectWritable> {
@Override
public void setup(Mapper<Text, Writable, Text, ObjectWritable>.Context context) {
}
@Override
public void map(Text key, Writable value,
Context context)
throws IOException, InterruptedException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(value);
context.write(key, objWrite);
}
}
/**
* Inverts outlinks to inlinks, attaches current score for the outlink from
* the NodeDb of the WebGraph.
*/
public static class InvertReducer extends
Reducer<Text, ObjectWritable, Text, LinkDatum> {
private Configuration conf;
@Override
public void setup(Reducer<Text, ObjectWritable, Text, LinkDatum>.Context context) {
conf = context.getConfiguration();
}
@Override
public void reduce(Text key, Iterable<ObjectWritable> values,
Context context)
throws IOException, InterruptedException {
String fromUrl = key.toString();
List<LinkDatum> outlinks = new ArrayList<>();
Node node = null;
// aggregate outlinks, assign other values
for (ObjectWritable write : values) {
Object obj = write.get();
if (obj instanceof Node) {
node = (Node) obj;
} else if (obj instanceof LinkDatum) {
outlinks.add(WritableUtils.clone((LinkDatum) obj, conf));
}
}
// get the number of outlinks and the current inlink and outlink scores
// from the node of the url
int numOutlinks = node.getNumOutlinks();
float inlinkScore = node.getInlinkScore();
float outlinkScore = node.getOutlinkScore();
LOG.debug(fromUrl + ": num outlinks " + numOutlinks);
// can't invert if no outlinks
if (numOutlinks > 0) {
for (int i = 0; i < outlinks.size(); i++) {
LinkDatum outlink = outlinks.get(i);
String toUrl = outlink.getUrl();
outlink.setUrl(fromUrl);
outlink.setScore(outlinkScore);
// collect the inverted outlink
context.write(new Text(toUrl), outlink);
LOG.debug(toUrl + ": inverting inlink from " + fromUrl
+ " origscore: " + inlinkScore + " numOutlinks: " + numOutlinks
+ " inlinkscore: " + outlinkScore);
}
}
}
}
}
/**
* Runs a single link analysis iteration.
*/
private static class Analyzer {
/**
* Convert values to ObjectWritable
*/
public static class AnalyzerMapper extends
Mapper<Text, Writable, Text, ObjectWritable> {
private Configuration conf;
/**
* Configures the job mapper, sets the damping factor, rank one score, and other
* needed values for analysis.
*/
@Override
public void setup(Mapper<Text, Writable, Text, ObjectWritable>.Context context) {
conf = context.getConfiguration();
}
@Override
public void map(Text key, Writable value,
Context context)
throws IOException, InterruptedException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(WritableUtils.clone(value, conf));
context.write(key, objWrite);
}
}
/**
* Performs a single iteration of link analysis. The resulting scores are
* stored in a temporary NodeDb which replaces the NodeDb of the WebGraph.
*/
public static class AnalyzerReducer extends
Reducer<Text, ObjectWritable, Text, Node> {
private Configuration conf;
private float dampingFactor = 0.85f;
private float rankOne = 0.0f;
private int itNum = 0;
private boolean limitPages = true;
private boolean limitDomains = true;
/**
* Configures the job reducer, sets the damping factor, rank one score, and other
* needed values for analysis.
*/
@Override
public void setup(
Reducer<Text, ObjectWritable, Text, Node>.Context context) {
conf = context.getConfiguration();
dampingFactor = conf.getFloat("link.analyze.damping.factor", 0.85f);
rankOne = conf.getFloat("link.analyze.rank.one", 0.0f);
itNum = conf.getInt("link.analyze.iteration", 0);
limitPages = conf.getBoolean("link.ignore.limit.page", true);
limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
}
@Override
public void reduce(Text key, Iterable<ObjectWritable> values,
Context context)
throws IOException, InterruptedException {
String url = key.toString();
Set<String> domains = new HashSet<>();
Set<String> pages = new HashSet<>();
Node node = null;
// a page with zero inlinks has a score of rankOne
int numInlinks = 0;
float totalInlinkScore = rankOne;
for (ObjectWritable next : values) {
Object value = next.get();
if (value instanceof Node) {
node = (Node) value;
} else if (value instanceof LinkDatum) {
LinkDatum linkDatum = (LinkDatum) value;
float scoreFromInlink = linkDatum.getScore();
String inlinkUrl = linkDatum.getUrl();
String inLinkDomain = URLUtil.getDomainName(inlinkUrl);
String inLinkPage = URLUtil.getPage(inlinkUrl);
// limit counting duplicate inlinks by pages or domains
if ((limitPages && pages.contains(inLinkPage))
|| (limitDomains && domains.contains(inLinkDomain))) {
LOG.debug(url + ": ignoring " + scoreFromInlink + " from "
+ inlinkUrl + ", duplicate page or domain");
continue;
}
// aggregate total inlink score
numInlinks++;
totalInlinkScore += scoreFromInlink;
domains.add(inLinkDomain);
pages.add(inLinkPage);
LOG.debug(url + ": adding " + scoreFromInlink + " from " + inlinkUrl
+ ", total: " + totalInlinkScore);
}
}
// calculate linkRank score formula
float linkRankScore = (1 - dampingFactor)
+ (dampingFactor * totalInlinkScore);
LOG.debug(url + ": score: " + linkRankScore + " num inlinks: "
+ numInlinks + " iteration: " + itNum);
// store the score in a temporary NodeDb
Node outNode = WritableUtils.clone(node, conf);
outNode.setInlinkScore(linkRankScore);
context.write(key, outNode);
}
}
}
/**
* Default constructor.
*/
public LinkRank() {
super();
}
/**
* Configurable constructor.
*/
public LinkRank(Configuration conf) {
super(conf);
}
/**
* Runs the complete link analysis job. The complete job determins rank one
* score. Then runs through a given number of invert and analyze iterations,
* by default 10. And finally replaces the NodeDb in the WebGraph with the
* link rank output.
*
* @param webGraphDb
* The WebGraph to run link analysis on.
*
* @throws IOException
* If an error occurs during link analysis.
*/
public void analyze(Path webGraphDb) throws IOException,
ClassNotFoundException, InterruptedException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("Analysis: starting at " + sdf.format(start));
// store the link rank under the webgraphdb temporarily, final scores get
// upddated into the nodedb
Path linkRank = new Path(webGraphDb, "linkrank");
Configuration conf = getConf();
FileSystem fs = linkRank.getFileSystem(conf);
// create the linkrank directory if needed
if (!fs.exists(linkRank)) {
fs.mkdirs(linkRank);
}
// the webgraph outlink and node database paths
Path wgOutlinkDb = new Path(webGraphDb, WebGraph.OUTLINK_DIR);
Path wgNodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
Path nodeDb = new Path(linkRank, WebGraph.NODE_DIR);
// get the number of total nodes in the webgraph, used for rank one, then
// initialze all urls with a default score
int numLinks = runCounter(fs, webGraphDb);
runInitializer(wgNodeDb, nodeDb);
float rankOneScore = (1f / (float) numLinks);
if (LOG.isInfoEnabled()) {
LOG.info("Analysis: Number of links: " + numLinks);
LOG.info("Analysis: Rank One: " + rankOneScore);
}
// run invert and analysis for a given number of iterations to allow the
// link rank scores to converge
int numIterations = conf.getInt("link.analyze.num.iterations", 10);
for (int i = 0; i < numIterations; i++) {
// the input to inverting is always the previous output from analysis
LOG.info("Analysis: Starting iteration " + (i + 1) + " of "
+ numIterations);
Path tempRank = new Path(linkRank + "-"
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
fs.mkdirs(tempRank);
Path tempInverted = new Path(tempRank, "inverted");
Path tempNodeDb = new Path(tempRank, WebGraph.NODE_DIR);
// run invert and analysis
runInverter(nodeDb, wgOutlinkDb, tempInverted);
runAnalysis(nodeDb, tempInverted, tempNodeDb, i, numIterations,
rankOneScore);
// replace the temporary NodeDb with the output from analysis
LOG.info("Analysis: Installing new link scores");
FSUtils.replace(fs, linkRank, tempRank, true);
LOG.info("Analysis: finished iteration " + (i + 1) + " of "
+ numIterations);
}
// replace the NodeDb in the WebGraph with the final output of analysis
LOG.info("Analysis: Installing web graph nodes");
FSUtils.replace(fs, wgNodeDb, nodeDb, true);
// remove the temporary link rank folder
fs.delete(linkRank, true);
long end = System.currentTimeMillis();
LOG.info("Analysis: finished at " + sdf.format(end) + ", elapsed: "
+ TimingUtil.elapsedTime(start, end));
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(NutchConfiguration.create(), new LinkRank(), args);
System.exit(res);
}
/**
* Runs the LinkRank tool.
*/
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
OptionBuilder.withArgName("help");
OptionBuilder.withDescription("show this help message");
Option helpOpts = OptionBuilder.create("help");
options.addOption(helpOpts);
OptionBuilder.withArgName("webgraphdb");
OptionBuilder.hasArg();
OptionBuilder.withDescription("the web graph db to use");
Option webgraphOpts = OptionBuilder.create("webgraphdb");
options.addOption(webgraphOpts);
CommandLineParser parser = new GnuParser();
try {
CommandLine line = parser.parse(options, args);
if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("LinkRank", options);
return -1;
}
String webGraphDb = line.getOptionValue("webgraphdb");
analyze(new Path(webGraphDb));
return 0;
} catch (Exception e) {
LOG.error("LinkAnalysis: " + StringUtils.stringifyException(e));
return -2;
}
}
}