Merge pull request #478 from sebastian-nagel/NUTCH-2279-linkrank-output-compression

NUTCH-2279 LinkRank fails when using Hadoop MR output compression
diff --git a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
index 6829927..b6bfa98 100644
--- a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
+++ b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
@@ -18,6 +18,7 @@
 
 import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.lang.invoke.MethodHandles;
 import java.text.SimpleDateFormat;
@@ -39,6 +40,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -46,6 +48,8 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -128,14 +132,42 @@
 
     // read the first (and only) line from the file which should be the
     // number of links in the web graph
-    LOG.info("Reading numlinks temp file");
-    FSDataInputStream readLinks = fs.open(new Path(numLinksPath, "part-r-00000"));
-    BufferedReader buffer = new BufferedReader(new InputStreamReader(readLinks));
+    FileStatus[] numLinksFiles = fs.listStatus(numLinksPath);
+    if (numLinksFiles.length == 0) {
+      throw new IOException("Failed to read numlinks temp file: "
+          + " no file found in " + numLinksPath);
+    } else if (numLinksFiles.length > 1) {
+      throw new IOException("Failed to read numlinks temp file: "
+          + " expected only one file but found " + numLinksFiles.length
+          + " files in folder " + numLinksPath);
+    }
+    Path numLinksFile = numLinksFiles[0].getPath();
+    LOG.info("Reading numlinks temp file {}", numLinksFile);
+    FSDataInputStream readLinks = fs.open(numLinksFile);
+    CompressionCodecFactory cf = new CompressionCodecFactory(conf);
+    CompressionCodec codec = cf.getCodec(numLinksFiles[0].getPath());
+    InputStream streamLinks;
+    if (codec == null) {
+      LOG.debug("No compression codec found for {}, trying uncompressed",
+          numLinksFile);
+      streamLinks = readLinks;
+    } else {
+      LOG.info("Compression codec of numlinks temp file: {}",
+          codec.getDefaultExtension());
+      readLinks.seek(0);
+      streamLinks = codec.createInputStream(readLinks);
+    }
+    BufferedReader buffer = new BufferedReader(
+        new InputStreamReader(streamLinks));
+
     String numLinksLine = buffer.readLine();
     readLinks.close();
 
     // check if there are links to process, if none, webgraph might be empty
     if (numLinksLine == null || numLinksLine.length() == 0) {
+      LOG.error(
+          "Failed to determine number of links because of empty line in input {}",
+          numLinksFile);
       fs.delete(numLinksPath, true);
       throw new IOException("No links to process, is the webgraph empty?");
     }