Merge pull request #478 from sebastian-nagel/NUTCH-2279-linkrank-output-compression
NUTCH-2279 LinkRank fails when using Hadoop MR output compression
diff --git a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
index 6829927..b6bfa98 100644
--- a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
+++ b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
@@ -18,6 +18,7 @@
import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.invoke.MethodHandles;
import java.text.SimpleDateFormat;
@@ -39,6 +40,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
@@ -46,6 +48,8 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
@@ -128,14 +132,42 @@
// read the first (and only) line from the file which should be the
// number of links in the web graph
- LOG.info("Reading numlinks temp file");
- FSDataInputStream readLinks = fs.open(new Path(numLinksPath, "part-r-00000"));
- BufferedReader buffer = new BufferedReader(new InputStreamReader(readLinks));
+ FileStatus[] numLinksFiles = fs.listStatus(numLinksPath);
+ if (numLinksFiles.length == 0) {
+ throw new IOException("Failed to read numlinks temp file: "
+ + " no file found in " + numLinksPath);
+ } else if (numLinksFiles.length > 1) {
+ throw new IOException("Failed to read numlinks temp file: "
+ + " expected only one file but found " + numLinksFiles.length
+ + " files in folder " + numLinksPath);
+ }
+ Path numLinksFile = numLinksFiles[0].getPath();
+ LOG.info("Reading numlinks temp file {}", numLinksFile);
+ FSDataInputStream readLinks = fs.open(numLinksFile);
+ CompressionCodecFactory cf = new CompressionCodecFactory(conf);
+ CompressionCodec codec = cf.getCodec(numLinksFiles[0].getPath());
+ InputStream streamLinks;
+ if (codec == null) {
+ LOG.debug("No compression codec found for {}, trying uncompressed",
+ numLinksFile);
+ streamLinks = readLinks;
+ } else {
+ LOG.info("Compression codec of numlinks temp file: {}",
+ codec.getDefaultExtension());
+ readLinks.seek(0);
+ streamLinks = codec.createInputStream(readLinks);
+ }
+ BufferedReader buffer = new BufferedReader(
+ new InputStreamReader(streamLinks));
+
String numLinksLine = buffer.readLine();
readLinks.close();
// check if there are links to process, if none, webgraph might be empty
if (numLinksLine == null || numLinksLine.length() == 0) {
+ LOG.error(
+ "Failed to determine number of links because of empty line in input {}",
+ numLinksFile);
fs.delete(numLinksPath, true);
throw new IOException("No links to process, is the webgraph empty?");
}