Merge pull request #462 from sebastian-nagel/NUTCH-2729-protocol-okhttp-mark-truncated

NUTCH-2729 protocol-okhttp: fix marking of truncated content
diff --git a/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java b/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java
index d7d4cdf..b84fdc0 100644
--- a/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java
+++ b/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java
@@ -164,9 +164,14 @@
     BufferedSource source = responseBody.source();
     int bytesRequested = 0;
     int bufferGrowStepBytes = 8192;
-    while (source.buffer().size() < maxContentBytes) {
+    while (source.buffer().size() <= maxContentBytes) {
       bytesRequested += Math.min(bufferGrowStepBytes,
-          (maxContentBytes - bytesRequested));
+          /*
+           * request one byte more than required to reliably detect truncated
+           * content, but beware of integer overflows
+           */
+          (maxContentBytes == Integer.MAX_VALUE ? maxContentBytes
+              : (1 + maxContentBytes)) - bytesRequested);
       boolean success = false;
       try {
         success = source.request(bytesRequested);
@@ -174,6 +179,8 @@
         if (partialAsTruncated && source.buffer().size() > 0) {
           // treat already fetched content as truncated
           truncated.setReason(TruncatedContentReason.DISCONNECT);
+          LOG.info("Truncated content for {}, partial fetch caused by:", url,
+              e);
         } else {
           throw e;
         }
@@ -191,7 +198,7 @@
         truncated.setReason(TruncatedContentReason.TIME);
         break;
       }
-      if (source.buffer().size() > maxContentBytes) {
+      if (source.buffer().size() >= maxContentBytes) {
         LOG.debug("content limit reached");
       }
       // okhttp may fetch more content than requested, forward requested bytes
diff --git a/src/plugin/protocol-okhttp/src/test/org/apache/nutch/protocol/okhttp/TestBadServerResponses.java b/src/plugin/protocol-okhttp/src/test/org/apache/nutch/protocol/okhttp/TestBadServerResponses.java
index bf69893..34c5f6f 100644
--- a/src/plugin/protocol-okhttp/src/test/org/apache/nutch/protocol/okhttp/TestBadServerResponses.java
+++ b/src/plugin/protocol-okhttp/src/test/org/apache/nutch/protocol/okhttp/TestBadServerResponses.java
@@ -21,9 +21,8 @@
 import static org.junit.Assert.assertTrue;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
 import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
 import java.lang.invoke.MethodHandles;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
@@ -33,6 +32,7 @@
 import java.nio.charset.StandardCharsets;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.zip.GZIPOutputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -98,14 +98,14 @@
   }
 
   /**
-   * Starts the test server at a specified port and constant response.
-   * 
-   * @param portno
-   *          Port number.
-   * @param response
-   *          response sent on every request
-   */
-  private void runServer(int port, String response) throws Exception {
+     * Starts the test server at a specified port and constant response.
+     * 
+     * @param portno
+     *          Port number.
+     * @param response
+     *          response sent on every request
+     */
+  private void runServer(int port, byte[] response) throws Exception {
     server = new ServerSocket();
     server.bind(new InetSocketAddress("127.0.0.1", port));
     Pattern requestPattern = Pattern.compile("(?i)^GET\\s+(\\S+)");
@@ -115,9 +115,7 @@
       LOG.info("Connection received");
       try (
           BufferedReader in = new BufferedReader(new InputStreamReader(
-              socket.getInputStream(), StandardCharsets.UTF_8));
-          PrintWriter out = new PrintWriter(new OutputStreamWriter(
-              socket.getOutputStream(), StandardCharsets.UTF_8), true)) {
+              socket.getInputStream(), StandardCharsets.UTF_8))) {
 
         String line;
         while ((line = in.readLine()) != null) {
@@ -129,13 +127,11 @@
           if (m.find()) {
             LOG.info("Requested {}", m.group(1));
             if (!m.group(1).startsWith("/")) {
-              response = "HTTP/1.1 400 Bad request\r\n\r\n";
+              response = "HTTP/1.1 400 Bad request\r\n\r\n".getBytes(StandardCharsets.UTF_8);
             }
           }
         }
-        LOG.info("Response: {}",
-            response.substring(0, Math.min(1024, response.length())));
-        out.print(response);
+        socket.getOutputStream().write(response);
       } catch (Exception e) {
         LOG.warn("Exception in test server:", e);
       }
@@ -143,6 +139,10 @@
   }
 
   private void launchServer(String response) throws InterruptedException {
+    launchServer(response.getBytes(StandardCharsets.UTF_8));
+  }
+
+  private void launchServer(byte[] response) throws InterruptedException {
     Thread serverThread = new Thread(() -> {
       try {
         runServer(port, response);
@@ -171,9 +171,6 @@
     CrawlDatum crawlDatum = new CrawlDatum();
     ProtocolOutput out = http.getProtocolOutput(new Text(url.toString()),
         crawlDatum);
-    if (expectedCode == -1) {
-      System.out.println(out);
-    }
     int httpStatusCode = -1;
     if (crawlDatum.getMetaData().containsKey(Nutch.PROTOCOL_STATUS_CODE_KEY)) {
       httpStatusCode = Integer.parseInt(crawlDatum.getMetaData()
@@ -321,7 +318,8 @@
   /**
    * NUTCH-2562 protocol-http fails to read large chunked HTTP responses,
    * NUTCH-2575 protocol-http does not respect the maximum content-size for
-   * chunked responses
+   * chunked responses. Also test whether truncations of chunked content are
+   * properly marked.
    */
   @Test
   public void testChunkedContent() throws Exception {
@@ -346,6 +344,137 @@
     assertEquals(
         "Chunked content not truncated according to http.content.limit", 65536,
         fetched.getContent().getContent().length);
+    assertNotNull("Content truncation not marked",
+        fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT));
+    assertEquals("Content truncation not marked",
+        Response.TruncatedContentReason.LENGTH.toString().toLowerCase(),
+        fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT_REASON));
+  }
+
+  /**
+   * NUTCH-2729 Check for http.content.limit defined in nutch-site-test.xml:
+   * whether content is truncated to the configured 64 kB and whether it is
+   * properly marked as truncated.
+   */
+  @Test
+  public void testTruncationMarking() throws Exception {
+    setUp();
+    int[] kBs = { 63, 64, 65 };
+    for (int kB : kBs) {
+      StringBuilder response = new StringBuilder();
+      response.append(responseHeader);
+      response.append("Content-Type: text/plain\r\nContent-Length: "
+          + (kB * 1024) + "\r\n\r\n");
+      for (int i = 0; i < kB; i++) {
+        for (int j = 0; j < 16; j++) {
+          // 16 chunks a 64 bytes = 1 kB
+          response.append(
+              "abcdefghijklmnopqurstuvxyz0123456789-ABCDEFGHIJKLMNOPQURSTUVXYZ\n");
+        }
+      }
+      launchServer(response.toString());
+      ProtocolOutput fetched = fetchPage("/", 200);
+      assertEquals("Content not truncated according to http.content.limit",
+          Math.min(kB * 1024, 65536), fetched.getContent().getContent().length);
+      if (kB * 1024 > 65536) {
+        assertNotNull("Content truncation not marked",
+            fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT));
+        assertEquals("Content truncation not marked",
+            Response.TruncatedContentReason.LENGTH.toString().toLowerCase(),
+            fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT_REASON));
+      }
+      server.close(); // need to close server before next loop iteration
+    }
+  }
+
+  /**
+   * NUTCH-2729 Check for http.content.limit defined in nutch-site-test.xml:
+   * whether content is truncated to the configured 64 kB and whether it is
+   * properly marked as truncated.
+   */
+  @Test
+  public void testTruncationMarkingGzip() throws Exception {
+    setUp();
+    int[] kBs = { 63, 64, 65 };
+    for (int kB : kBs) {
+      StringBuilder payload = new StringBuilder();
+      for (int i = 0; i < kB; i++) {
+        for (int j = 0; j < 16; j++) {
+          // 16 chunks a 64 bytes = 1 kB
+          payload.append(
+              "abcdefghijklmnopqurstuvxyz0123456789-ABCDEFGHIJKLMNOPQURSTUVXYZ\n");
+        }
+      }
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      GZIPOutputStream gzip = new GZIPOutputStream(bytes);
+      gzip.write(payload.toString().getBytes(StandardCharsets.UTF_8));
+      gzip.close();
+      StringBuilder responseHead = new StringBuilder();
+      responseHead.append(responseHeader);
+      responseHead.append("Content-Type: text/plain\r\nContent-Length: "
+          + bytes.size() + "\r\nContent-Encoding: gzip\r\n\r\n");
+      ByteArrayOutputStream response = new ByteArrayOutputStream();
+      response.write(responseHead.toString().getBytes(StandardCharsets.UTF_8));
+      response.write(bytes.toByteArray());
+
+      launchServer(response.toByteArray());
+      ProtocolOutput fetched = fetchPage("/", 200);
+      assertEquals("Content not truncated according to http.content.limit",
+          Math.min(kB * 1024, 65536), fetched.getContent().getContent().length);
+      if (kB * 1024 > 65536) {
+        assertNotNull("Content truncation not marked",
+            fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT));
+        assertEquals("Content truncation not marked",
+            Response.TruncatedContentReason.LENGTH.toString().toLowerCase(),
+            fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT_REASON));
+      }
+      server.close(); // need to close server before next loop iteration
+    }
+  }
+
+  /**
+   * Force an exception after all content has been fetched by sending a wrong
+   * `Content-Length` header and check whether the content is stored anyway if
+   * http.partial.truncated == true
+   */
+  @Test
+  public void testPartialContentTruncated() throws Exception {
+    setUp();
+    conf.setBoolean("http.partial.truncated", true);
+    http.setConf(conf);
+    String testContent = "This is a text.";
+    launchServer(
+        responseHeader + "Content-Length: 50000\r\n\r\n" + testContent);
+    ProtocolOutput fetched = fetchPage("/", 200);
+    assertEquals("Content not saved as truncated", testContent,
+        new String(fetched.getContent().getContent(), StandardCharsets.UTF_8));
+    assertNotNull("Content truncation not marked",
+        fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT));
+  }
+
+  @Test
+  public void testNoContentLimit() throws Exception {
+    setUp();
+    conf.setInt("http.content.limit", -1);
+    http.setConf(conf);
+    StringBuilder response = new StringBuilder();
+    response.append(responseHeader);
+    // Even 128 kB content shouldn't cause any truncation because
+    // http.content.limit == -1
+    int kB = 128;
+    response.append("Content-Type: text/plain\r\nContent-Length: " + (kB * 1024)
+        + "\r\n\r\n");
+    for (int i = 0; i < kB; i++) {
+      for (int j = 0; j < 16; j++) {
+        // 16 chunks a 64 bytes = 1 kB
+        response.append(
+            "abcdefghijklmnopqurstuvxyz0123456789-ABCDEFGHIJKLMNOPQURSTUVXYZ\n");
+      }
+    }
+    launchServer(response.toString());
+    ProtocolOutput fetched = fetchPage("/", 200);
+    assertEquals("Content truncated although http.content.limit == -1",
+        (kB * 1024), fetched.getContent().getContent().length);
   }
 
 }