Merge pull request #462 from sebastian-nagel/NUTCH-2729-protocol-okhttp-mark-truncated
NUTCH-2729 protocol-okhttp: fix marking of truncated content
diff --git a/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java b/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java
index d7d4cdf..b84fdc0 100644
--- a/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java
+++ b/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java
@@ -164,9 +164,14 @@
BufferedSource source = responseBody.source();
int bytesRequested = 0;
int bufferGrowStepBytes = 8192;
- while (source.buffer().size() < maxContentBytes) {
+ while (source.buffer().size() <= maxContentBytes) {
bytesRequested += Math.min(bufferGrowStepBytes,
- (maxContentBytes - bytesRequested));
+ /*
+ * request one byte more than required to reliably detect truncated
+ * content, but beware of integer overflows
+ */
+ (maxContentBytes == Integer.MAX_VALUE ? maxContentBytes
+ : (1 + maxContentBytes)) - bytesRequested);
boolean success = false;
try {
success = source.request(bytesRequested);
@@ -174,6 +179,8 @@
if (partialAsTruncated && source.buffer().size() > 0) {
// treat already fetched content as truncated
truncated.setReason(TruncatedContentReason.DISCONNECT);
+ LOG.info("Truncated content for {}, partial fetch caused by:", url,
+ e);
} else {
throw e;
}
@@ -191,7 +198,7 @@
truncated.setReason(TruncatedContentReason.TIME);
break;
}
- if (source.buffer().size() > maxContentBytes) {
+ if (source.buffer().size() >= maxContentBytes) {
LOG.debug("content limit reached");
}
// okhttp may fetch more content than requested, forward requested bytes
diff --git a/src/plugin/protocol-okhttp/src/test/org/apache/nutch/protocol/okhttp/TestBadServerResponses.java b/src/plugin/protocol-okhttp/src/test/org/apache/nutch/protocol/okhttp/TestBadServerResponses.java
index bf69893..34c5f6f 100644
--- a/src/plugin/protocol-okhttp/src/test/org/apache/nutch/protocol/okhttp/TestBadServerResponses.java
+++ b/src/plugin/protocol-okhttp/src/test/org/apache/nutch/protocol/okhttp/TestBadServerResponses.java
@@ -21,9 +21,8 @@
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
import java.lang.invoke.MethodHandles;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
@@ -33,6 +32,7 @@
import java.nio.charset.StandardCharsets;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.zip.GZIPOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -98,14 +98,14 @@
}
/**
- * Starts the test server at a specified port and constant response.
- *
- * @param portno
- * Port number.
- * @param response
- * response sent on every request
- */
- private void runServer(int port, String response) throws Exception {
+ * Starts the test server at a specified port and constant response.
+ *
+ * @param portno
+ * Port number.
+ * @param response
+ * response sent on every request
+ */
+ private void runServer(int port, byte[] response) throws Exception {
server = new ServerSocket();
server.bind(new InetSocketAddress("127.0.0.1", port));
Pattern requestPattern = Pattern.compile("(?i)^GET\\s+(\\S+)");
@@ -115,9 +115,7 @@
LOG.info("Connection received");
try (
BufferedReader in = new BufferedReader(new InputStreamReader(
- socket.getInputStream(), StandardCharsets.UTF_8));
- PrintWriter out = new PrintWriter(new OutputStreamWriter(
- socket.getOutputStream(), StandardCharsets.UTF_8), true)) {
+ socket.getInputStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = in.readLine()) != null) {
@@ -129,13 +127,11 @@
if (m.find()) {
LOG.info("Requested {}", m.group(1));
if (!m.group(1).startsWith("/")) {
- response = "HTTP/1.1 400 Bad request\r\n\r\n";
+ response = "HTTP/1.1 400 Bad request\r\n\r\n".getBytes(StandardCharsets.UTF_8);
}
}
}
- LOG.info("Response: {}",
- response.substring(0, Math.min(1024, response.length())));
- out.print(response);
+ socket.getOutputStream().write(response);
} catch (Exception e) {
LOG.warn("Exception in test server:", e);
}
@@ -143,6 +139,10 @@
}
private void launchServer(String response) throws InterruptedException {
+ launchServer(response.getBytes(StandardCharsets.UTF_8));
+ }
+
+ private void launchServer(byte[] response) throws InterruptedException {
Thread serverThread = new Thread(() -> {
try {
runServer(port, response);
@@ -171,9 +171,6 @@
CrawlDatum crawlDatum = new CrawlDatum();
ProtocolOutput out = http.getProtocolOutput(new Text(url.toString()),
crawlDatum);
- if (expectedCode == -1) {
- System.out.println(out);
- }
int httpStatusCode = -1;
if (crawlDatum.getMetaData().containsKey(Nutch.PROTOCOL_STATUS_CODE_KEY)) {
httpStatusCode = Integer.parseInt(crawlDatum.getMetaData()
@@ -321,7 +318,8 @@
/**
* NUTCH-2562 protocol-http fails to read large chunked HTTP responses,
* NUTCH-2575 protocol-http does not respect the maximum content-size for
- * chunked responses
+ * chunked responses. Also test whether truncations of chunked content are
+ * properly marked.
*/
@Test
public void testChunkedContent() throws Exception {
@@ -346,6 +344,137 @@
assertEquals(
"Chunked content not truncated according to http.content.limit", 65536,
fetched.getContent().getContent().length);
+ assertNotNull("Content truncation not marked",
+ fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT));
+ assertEquals("Content truncation not marked",
+ Response.TruncatedContentReason.LENGTH.toString().toLowerCase(),
+ fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT_REASON));
+ }
+
+ /**
+ * NUTCH-2729 Check for http.content.limit defined in nutch-site-test.xml:
+ * whether content is truncated to the configured 64 kB and whether it is
+ * properly marked as truncated.
+ */
+ @Test
+ public void testTruncationMarking() throws Exception {
+ setUp();
+ int[] kBs = { 63, 64, 65 };
+ for (int kB : kBs) {
+ StringBuilder response = new StringBuilder();
+ response.append(responseHeader);
+ response.append("Content-Type: text/plain\r\nContent-Length: "
+ + (kB * 1024) + "\r\n\r\n");
+ for (int i = 0; i < kB; i++) {
+ for (int j = 0; j < 16; j++) {
+ // 16 chunks a 64 bytes = 1 kB
+ response.append(
+ "abcdefghijklmnopqurstuvxyz0123456789-ABCDEFGHIJKLMNOPQURSTUVXYZ\n");
+ }
+ }
+ launchServer(response.toString());
+ ProtocolOutput fetched = fetchPage("/", 200);
+ assertEquals("Content not truncated according to http.content.limit",
+ Math.min(kB * 1024, 65536), fetched.getContent().getContent().length);
+ if (kB * 1024 > 65536) {
+ assertNotNull("Content truncation not marked",
+ fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT));
+ assertEquals("Content truncation not marked",
+ Response.TruncatedContentReason.LENGTH.toString().toLowerCase(),
+ fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT_REASON));
+ }
+ server.close(); // need to close server before next loop iteration
+ }
+ }
+
+ /**
+ * NUTCH-2729 Check for http.content.limit defined in nutch-site-test.xml:
+ * whether content is truncated to the configured 64 kB and whether it is
+ * properly marked as truncated.
+ */
+ @Test
+ public void testTruncationMarkingGzip() throws Exception {
+ setUp();
+ int[] kBs = { 63, 64, 65 };
+ for (int kB : kBs) {
+ StringBuilder payload = new StringBuilder();
+ for (int i = 0; i < kB; i++) {
+ for (int j = 0; j < 16; j++) {
+ // 16 chunks a 64 bytes = 1 kB
+ payload.append(
+ "abcdefghijklmnopqurstuvxyz0123456789-ABCDEFGHIJKLMNOPQURSTUVXYZ\n");
+ }
+ }
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ GZIPOutputStream gzip = new GZIPOutputStream(bytes);
+ gzip.write(payload.toString().getBytes(StandardCharsets.UTF_8));
+ gzip.close();
+ StringBuilder responseHead = new StringBuilder();
+ responseHead.append(responseHeader);
+ responseHead.append("Content-Type: text/plain\r\nContent-Length: "
+ + bytes.size() + "\r\nContent-Encoding: gzip\r\n\r\n");
+ ByteArrayOutputStream response = new ByteArrayOutputStream();
+ response.write(responseHead.toString().getBytes(StandardCharsets.UTF_8));
+ response.write(bytes.toByteArray());
+
+ launchServer(response.toByteArray());
+ ProtocolOutput fetched = fetchPage("/", 200);
+ assertEquals("Content not truncated according to http.content.limit",
+ Math.min(kB * 1024, 65536), fetched.getContent().getContent().length);
+ if (kB * 1024 > 65536) {
+ assertNotNull("Content truncation not marked",
+ fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT));
+ assertEquals("Content truncation not marked",
+ Response.TruncatedContentReason.LENGTH.toString().toLowerCase(),
+ fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT_REASON));
+ }
+ server.close(); // need to close server before next loop iteration
+ }
+ }
+
+ /**
+ * Force an exception after all content has been fetched by sending a wrong
+ * `Content-Length` header and check whether the content is stored anyway if
+ * http.partial.truncated == true
+ */
+ @Test
+ public void testPartialContentTruncated() throws Exception {
+ setUp();
+ conf.setBoolean("http.partial.truncated", true);
+ http.setConf(conf);
+ String testContent = "This is a text.";
+ launchServer(
+ responseHeader + "Content-Length: 50000\r\n\r\n" + testContent);
+ ProtocolOutput fetched = fetchPage("/", 200);
+ assertEquals("Content not saved as truncated", testContent,
+ new String(fetched.getContent().getContent(), StandardCharsets.UTF_8));
+ assertNotNull("Content truncation not marked",
+ fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT));
+ }
+
+ @Test
+ public void testNoContentLimit() throws Exception {
+ setUp();
+ conf.setInt("http.content.limit", -1);
+ http.setConf(conf);
+ StringBuilder response = new StringBuilder();
+ response.append(responseHeader);
+ // Even 128 kB content shouldn't cause any truncation because
+ // http.content.limit == -1
+ int kB = 128;
+ response.append("Content-Type: text/plain\r\nContent-Length: " + (kB * 1024)
+ + "\r\n\r\n");
+ for (int i = 0; i < kB; i++) {
+ for (int j = 0; j < 16; j++) {
+ // 16 chunks a 64 bytes = 1 kB
+ response.append(
+ "abcdefghijklmnopqurstuvxyz0123456789-ABCDEFGHIJKLMNOPQURSTUVXYZ\n");
+ }
+ }
+ launchServer(response.toString());
+ ProtocolOutput fetched = fetchPage("/", 200);
+ assertEquals("Content truncated although http.content.limit == -1",
+ (kB * 1024), fetched.getContent().getContent().length);
}
}