Merge branch 'master' into NUTCH-2654
diff --git a/conf/adaptive-mimetypes.txt b/conf/adaptive-mimetypes.txt.template
similarity index 100%
rename from conf/adaptive-mimetypes.txt
rename to conf/adaptive-mimetypes.txt.template
diff --git a/conf/cookies.txt b/conf/cookies.txt.template
similarity index 100%
rename from conf/cookies.txt
rename to conf/cookies.txt.template
diff --git a/conf/db-ignore-external-exemptions.txt b/conf/db-ignore-external-exemptions.txt.template
similarity index 100%
rename from conf/db-ignore-external-exemptions.txt
rename to conf/db-ignore-external-exemptions.txt.template
diff --git a/conf/domain-suffixes.xml b/conf/domain-suffixes.xml.template
similarity index 100%
rename from conf/domain-suffixes.xml
rename to conf/domain-suffixes.xml.template
diff --git a/conf/domain-urlfilter.txt b/conf/domain-urlfilter.txt.template
similarity index 100%
rename from conf/domain-urlfilter.txt
rename to conf/domain-urlfilter.txt.template
diff --git a/conf/domainblacklist-urlfilter.txt b/conf/domainblacklist-urlfilter.txt.template
similarity index 100%
rename from conf/domainblacklist-urlfilter.txt
rename to conf/domainblacklist-urlfilter.txt.template
diff --git a/conf/host-urlnormalizer.txt b/conf/host-urlnormalizer.txt.template
similarity index 100%
rename from conf/host-urlnormalizer.txt
rename to conf/host-urlnormalizer.txt.template
diff --git a/conf/mimetype-filter.txt b/conf/mimetype-filter.txt.template
similarity index 100%
rename from conf/mimetype-filter.txt
rename to conf/mimetype-filter.txt.template
diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml.template
similarity index 99%
rename from conf/nutch-default.xml
rename to conf/nutch-default.xml.template
index 1e37cc3..c5359bc 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml.template
@@ -1314,6 +1314,20 @@
   </description>
 </property>
 
+<property>
+  <name>indexer.indexwriters.file</name>
+  <value>index-writers.xml</value>
+  <description>The configuration file for index writers.</description>
+</property>
+
+<!-- Exchanges properties -->
+
+<property>
+  <name>exchanges.exchanges.file</name>
+  <value>exchanges.xml</value>
+  <description>The configuration file used by the Exchange component.</description>
+</property>
+
 <!-- URL normalizer properties -->
 
 <property>
diff --git a/conf/parse-plugins.xml b/conf/parse-plugins.xml.template
similarity index 100%
rename from conf/parse-plugins.xml
rename to conf/parse-plugins.xml.template
diff --git a/conf/protocols.txt b/conf/protocols.txt.template
similarity index 100%
rename from conf/protocols.txt
rename to conf/protocols.txt.template
diff --git a/conf/regex-parsefilter.txt b/conf/regex-parsefilter.txt.template
similarity index 100%
rename from conf/regex-parsefilter.txt
rename to conf/regex-parsefilter.txt.template
diff --git a/ivy/ivy.xml b/ivy/ivy.xml
index e753c6f..2ffeac4 100644
--- a/ivy/ivy.xml
+++ b/ivy/ivy.xml
@@ -80,12 +80,11 @@
 			<exclude module="hadoop-client" />
 		</dependency>
 
-		<!--dependency org="org.apache.cxf" name="cxf" rev="3.0.4" conf="*->default"/-->
-		<dependency org="org.apache.cxf" name="cxf-rt-frontend-jaxws" rev="3.2.7" conf="*->default"/>
-		<dependency org="org.apache.cxf" name="cxf-rt-frontend-jaxrs" rev="3.2.7" conf="*->default"/>
-		<dependency org="org.apache.cxf" name="cxf-rt-transports-http" rev="3.2.7" conf="*->default"/>
-		<dependency org="org.apache.cxf" name="cxf-rt-transports-http-jetty" rev="3.2.7" conf="*->default"/>
-		<dependency org="org.apache.cxf" name="cxf-rt-rs-client" rev="3.2.7" conf="test->default"/>
+		<dependency org="org.apache.cxf" name="cxf-rt-frontend-jaxws" rev="3.3.3" conf="*->default"/>
+		<dependency org="org.apache.cxf" name="cxf-rt-frontend-jaxrs" rev="3.3.3" conf="*->default"/>
+		<dependency org="org.apache.cxf" name="cxf-rt-transports-http" rev="3.3.3" conf="*->default"/>
+		<dependency org="org.apache.cxf" name="cxf-rt-transports-http-jetty" rev="3.3.3" conf="*->default"/>
+		<dependency org="org.apache.cxf" name="cxf-rt-rs-client" rev="3.3.3" conf="test->default"/>
 		<dependency org="com.fasterxml.jackson.core" name="jackson-databind" rev="2.9.9" conf="*->default"/>
 		<dependency org="com.fasterxml.jackson.core" name="jackson-annotations" rev="2.9.9" conf="*->default"/>
 		<dependency org="com.fasterxml.jackson.dataformat" name="jackson-dataformat-cbor" rev="2.9.9" conf="*->default"/>
diff --git a/ivy/ivysettings.xml b/ivy/ivysettings.xml
index 7a3b949..18038a5 100644
--- a/ivy/ivysettings.xml
+++ b/ivy/ivysettings.xml
@@ -29,14 +29,6 @@
     value="[organisation]/[module]/[revision]/[module]-[revision](-[classifier])"/>
   <property name="maven2.pattern.ext"
     value="${maven2.pattern}.[ext]"/>
-  <!-- define packaging.type=jar to work around the failing dependency download of
-         javax.ws.rs-api.jar
-       required by Tika (1.19 and higher), cf.
-         https://github.com/eclipse-ee4j/jaxrs-api/issues/572
-         https://github.com/jax-rs/api/pull/576
-  -->
-  <property name="packaging.type"
-    value="jar"/>
   <!-- pull in the local repository -->
   <include url="${ivy.default.conf.dir}/ivyconf-local.xml"/>
   <settings defaultResolver="default"/>
diff --git a/src/java/org/apache/nutch/exchange/Exchanges.java b/src/java/org/apache/nutch/exchange/Exchanges.java
index 1f443d4..1e0518b 100644
--- a/src/java/org/apache/nutch/exchange/Exchanges.java
+++ b/src/java/org/apache/nutch/exchange/Exchanges.java
@@ -96,8 +96,10 @@
    * @return An array with each exchange's configuration.
    */
   private ExchangeConfig[] loadConfigurations(Configuration conf) {
+    String filename = conf.get("exchanges.exchanges.file",
+        "exchanges.xml");
     InputSource inputSource = new InputSource(
-        conf.getConfResourceAsInputStream("exchanges.xml"));
+        conf.getConfResourceAsInputStream(filename));
 
     final List<ExchangeConfig> configList = new LinkedList<>();
 
@@ -120,7 +122,7 @@
       }
 
     } catch (SAXException | IOException | ParserConfigurationException e) {
-      LOG.warn(e.toString());
+      LOG.error(e.toString());
     }
 
     return configList.toArray(new ExchangeConfig[0]);
diff --git a/src/java/org/apache/nutch/indexer/IndexWriters.java b/src/java/org/apache/nutch/indexer/IndexWriters.java
index 9fac2e2..5778997 100644
--- a/src/java/org/apache/nutch/indexer/IndexWriters.java
+++ b/src/java/org/apache/nutch/indexer/IndexWriters.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nutch.indexer;
 
-import de.vandermeer.asciitable.AT_ColumnWidthCalculator;
 import de.vandermeer.asciitable.AT_Row;
 import de.vandermeer.asciitable.AsciiTable;
 import de.vandermeer.skb.interfaces.document.TableRowType;
@@ -115,8 +114,10 @@
    * @param conf Nutch configuration instance.
    */
   private IndexWriterConfig[] loadWritersConfiguration(Configuration conf) {
+    String filename = conf.get("indexer.indexwriters.file",
+        "index-writers.xml");
     InputStream ssInputStream = conf
-        .getConfResourceAsInputStream("index-writers.xml");
+        .getConfResourceAsInputStream(filename);
     InputSource inputSource = new InputSource(ssInputStream);
 
     try {
@@ -136,7 +137,7 @@
 
       return indexWriterConfigs;
     } catch (SAXException | IOException | ParserConfigurationException e) {
-      LOG.warn(e.toString());
+      LOG.error(e.toString());
       return new IndexWriterConfig[0];
     }
   }
@@ -218,6 +219,10 @@
 
   public void write(NutchDocument doc) throws IOException {
     for (String indexWriterId : getIndexWriters(doc)) {
+      if (!this.indexWriters.containsKey(indexWriterId)) {
+        LOG.warn("Index writer {} is not present. Maybe the plugin is not in plugin.includes or there is a misspelling.", indexWriterId);
+        continue;
+      }
       NutchDocument mappedDocument = mapDocument(doc,
           this.indexWriters.get(indexWriterId).getIndexWriterConfig()
               .getMapping());
@@ -228,6 +233,10 @@
 
   public void update(NutchDocument doc) throws IOException {
     for (String indexWriterId : getIndexWriters(doc)) {
+      if (!this.indexWriters.containsKey(indexWriterId)) {
+        LOG.warn("Index writer {} is not present. Maybe the plugin is not in plugin.includes or there is a misspelling.", indexWriterId);
+        continue;
+      }
       NutchDocument mappedDocument = mapDocument(doc,
           this.indexWriters.get(indexWriterId).getIndexWriterConfig()
               .getMapping());
diff --git a/src/java/org/apache/nutch/net/URLNormalizerChecker.java b/src/java/org/apache/nutch/net/URLNormalizerChecker.java
index 2805f85..ee25f2f 100644
--- a/src/java/org/apache/nutch/net/URLNormalizerChecker.java
+++ b/src/java/org/apache/nutch/net/URLNormalizerChecker.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nutch.net;
 
+import java.net.MalformedURLException;
+
 import org.apache.hadoop.util.ToolRunner;
 
 import org.apache.nutch.util.AbstractChecker;
@@ -35,7 +37,8 @@
         + "\n             \t(if not given all configured URL normalizers are applied)"
         + "\n  -scope     \tone of: default,partition,generate_host_count,fetcher,crawldb,linkdb,inject,outlink"
         + "\n  -stdin     \ttool reads a list of URLs from stdin, one URL per line"
-        + "\n  -listen <port>\trun tool as Telnet server listening on <port>\n";
+        + "\n  -listen <port>\trun tool as Telnet server listening on <port>"
+        + "\n\nAn empty line is added to the output if a URL fails to normalize (MalformedURLException or null returned).\n";
 
     // Print help when no args given
     if (args.length < 1) {
@@ -71,7 +74,16 @@
   }
 
   protected int process(String line, StringBuilder output) throws Exception {
-    output.append(normalizers.normalize(line, scope));
+    try {
+      String norm = normalizers.normalize(line, scope);
+      if (norm == null) {
+        output.append("");
+      } else {
+        output.append(norm);
+      }
+    } catch (MalformedURLException e) {
+      output.append("");
+    }
     return 0;
   }
 
diff --git a/src/java/org/apache/nutch/segment/SegmentReader.java b/src/java/org/apache/nutch/segment/SegmentReader.java
index ae64f71..bcf99b8 100644
--- a/src/java/org/apache/nutch/segment/SegmentReader.java
+++ b/src/java/org/apache/nutch/segment/SegmentReader.java
@@ -25,6 +25,7 @@
 import java.io.PrintWriter;
 import java.io.Writer;
 import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -116,7 +117,7 @@
         fs.delete(segmentDumpFile, true);
 
       final PrintStream printStream = new PrintStream(
-          fs.create(segmentDumpFile));
+          fs.create(segmentDumpFile), false, StandardCharsets.UTF_8.name());
       return new RecordWriter<WritableComparable<?>, Writable>() {
         public synchronized void write(WritableComparable<?> key, Writable value)
             throws IOException {
@@ -254,12 +255,12 @@
         HadoopFSUtil.getPassAllFilter());
     Path[] files = HadoopFSUtil.getPaths(fstats);
 
-    PrintWriter writer = null;
     int currentRecordNumber = 0;
     if (files.length > 0) {
-      writer = new PrintWriter(
-          new BufferedWriter(new OutputStreamWriter(outFs.create(dumpFile))));
-      try {
+      try (PrintWriter writer = new PrintWriter(
+          new BufferedWriter(new OutputStreamWriter(outFs.create(dumpFile),
+              StandardCharsets.UTF_8)))) {
+
         for (int i = 0; i < files.length; i++) {
           Path partFile = files[i];
           try {
@@ -273,8 +274,6 @@
             }
           }
         }
-      } finally {
-        writer.close();
       }
     }
     fs.delete(tempDir, true);
@@ -286,8 +285,8 @@
   /** Appends two files and updates the Recno counter */
   private int append(FileSystem fs, Configuration conf, Path src,
       PrintWriter writer, int currentRecordNumber) throws IOException {
-    try (BufferedReader reader = new BufferedReader(new InputStreamReader(
-        fs.open(src)))) {
+    try (BufferedReader reader = new BufferedReader(
+        new InputStreamReader(fs.open(src), StandardCharsets.UTF_8))) {
       String line = reader.readLine();
       while (line != null) {
         if (line.startsWith("Recno:: ")) {
@@ -666,7 +665,7 @@
         } else
           dirs.add(new Path(args[i]));
       }
-      segmentReader.list(dirs, new OutputStreamWriter(System.out, "UTF-8"));
+      segmentReader.list(dirs, new OutputStreamWriter(System.out, StandardCharsets.UTF_8));
       return 0;
     case MODE_GET:
       input = args[1];
@@ -682,7 +681,7 @@
         return -1;
       }
       segmentReader.get(new Path(input), new Text(key), new OutputStreamWriter(
-          System.out, "UTF-8"), new HashMap<>());
+          System.out, StandardCharsets.UTF_8), new HashMap<>());
       return 0;
     default:
       System.err.println("Invalid operation: " + args[0]);
diff --git a/src/java/org/apache/nutch/util/SitemapProcessor.java b/src/java/org/apache/nutch/util/SitemapProcessor.java
index cbfbe0c..18e3871 100644
--- a/src/java/org/apache/nutch/util/SitemapProcessor.java
+++ b/src/java/org/apache/nutch/util/SitemapProcessor.java
@@ -132,46 +132,27 @@
           context.write(key, (CrawlDatum) value);
         }
         else if (value instanceof HostDatum) {
-          // For entry from hostdb, get sitemap url(s) from robots.txt, fetch the sitemap,
-          // extract urls and emit those
-
-          // try different combinations of schemes one by one till we get rejection in all cases
-          String host = key.toString();
-          if((url = filterNormalize("http://" + host + "/")) == null &&
-              (url = filterNormalize("https://" + host + "/")) == null &&
-              (url = filterNormalize("ftp://" + host + "/")) == null &&
-              (url = filterNormalize("file:/" + host + "/")) == null) {
-            context.getCounter("Sitemap", "filtered_records").increment(1);
-            return;
-          }
-          // We may wish to use the robots.txt content as the third parameter for .getRobotRules
-          BaseRobotRules rules = protocolFactory.getProtocol(url).getRobotRules(new Text(url), datum, null);
-          List<String> sitemaps = rules.getSitemaps();
-
-          if (tryDefaultSitemapXml && sitemaps.size() == 0) {
-            sitemaps.add(url + "sitemap.xml");
-          }
-          for (String sitemap : sitemaps) {
-            context.getCounter("Sitemap", "sitemaps_from_hostdb").increment(1);
-            sitemap = filterNormalize(sitemap);
-            if (sitemap == null) {
-              context.getCounter("Sitemap", "filtered_sitemaps_from_hostdb")
-                  .increment(1);
-            } else {
-              generateSitemapUrlDatum(protocolFactory.getProtocol(sitemap),
-                  sitemap, context);
-            }
-          }
+          generateSitemapsFromHostname(key.toString(), context);
         }
         else if (value instanceof Text) {
-          // For entry from sitemap urls file, fetch the sitemap, extract urls and emit those
-          if((url = filterNormalize(key.toString())) == null) {
-            context.getCounter("Sitemap", "filtered_records").increment(1);
-            return;
-          }
+          // Input can be sitemap URL or hostname
+          url = key.toString();
+          if (url.startsWith("http://") ||
+                url.startsWith("https://") ||
+                url.startsWith("ftp://") ||
+                url.startsWith("file:/")) {
+            // For entry from sitemap urls file, fetch the sitemap, extract urls and emit those
+            if((url = filterNormalize(url)) == null) {
+              context.getCounter("Sitemap", "filtered_records").increment(1);
+              return;
+            }
 
-          context.getCounter("Sitemap", "sitemap_seeds").increment(1);
-          generateSitemapUrlDatum(protocolFactory.getProtocol(url), url, context);
+            context.getCounter("Sitemap", "sitemap_seeds").increment(1);
+            generateSitemapUrlDatum(protocolFactory.getProtocol(url), url, context); 
+          } else {
+            LOG.info("generateSitemapsFromHostname: " + key.toString());
+            generateSitemapsFromHostname(key.toString(), context);
+          }
         }
       } catch (Exception e) {
         LOG.warn("Exception for record {} : {}", key.toString(), StringUtils.stringifyException(e));
@@ -191,6 +172,43 @@
       }
       return url;
     }
+    
+    private void generateSitemapsFromHostname(String host, Context context) {
+      try {
+        // For entry from hostdb, get sitemap url(s) from robots.txt, fetch the sitemap,
+        // extract urls and emit those
+
+        // try different combinations of schemes one by one till we get rejection in all cases
+        String url;
+        if((url = filterNormalize("http://" + host + "/")) == null &&
+            (url = filterNormalize("https://" + host + "/")) == null &&
+            (url = filterNormalize("ftp://" + host + "/")) == null &&
+            (url = filterNormalize("file:/" + host + "/")) == null) {
+          context.getCounter("Sitemap", "filtered_records").increment(1);
+          return;
+        }
+        // We may wish to use the robots.txt content as the third parameter for .getRobotRules
+        BaseRobotRules rules = protocolFactory.getProtocol(url).getRobotRules(new Text(url), datum, null);
+        List<String> sitemaps = rules.getSitemaps();
+
+        if (tryDefaultSitemapXml && sitemaps.size() == 0) {
+          sitemaps.add(url + "sitemap.xml");
+        }
+        for (String sitemap : sitemaps) {
+          context.getCounter("Sitemap", "sitemaps_from_hostname").increment(1);
+          sitemap = filterNormalize(sitemap);
+          if (sitemap == null) {
+            context.getCounter("Sitemap", "filtered_sitemaps_from_hostname")
+                .increment(1);
+          } else {
+            generateSitemapUrlDatum(protocolFactory.getProtocol(sitemap),
+                sitemap, context);
+          }
+        }
+      } catch (Exception e) {
+        LOG.warn("Exception for record {} : {}", host, StringUtils.stringifyException(e));
+      }
+    }
 
     private void generateSitemapUrlDatum(Protocol protocol, String url, Context context) throws Exception {
       ProtocolOutput output = protocol.getProtocolOutput(new Text(url), datum);
@@ -399,13 +417,13 @@
 
       if (LOG.isInfoEnabled()) {
         long filteredRecords = job.getCounters().findCounter("Sitemap", "filtered_records").getValue();
-        long fromHostDb = job.getCounters().findCounter("Sitemap", "sitemaps_from_hostdb").getValue();
+        long fromHostname = job.getCounters().findCounter("Sitemap", "sitemaps_from_hostname").getValue();
         long fromSeeds = job.getCounters().findCounter("Sitemap", "sitemap_seeds").getValue();
         long failedFetches = job.getCounters().findCounter("Sitemap", "failed_fetches").getValue();
         long newSitemapEntries = job.getCounters().findCounter("Sitemap", "new_sitemap_entries").getValue();
 
         LOG.info("SitemapProcessor: Total records rejected by filters: {}", filteredRecords);
-        LOG.info("SitemapProcessor: Total sitemaps from HostDb: {}", fromHostDb);
+        LOG.info("SitemapProcessor: Total sitemaps from host name: {}", fromHostname);
         LOG.info("SitemapProcessor: Total sitemaps from seed urls: {}", fromSeeds);
         LOG.info("SitemapProcessor: Total failed sitemap fetches: {}", failedFetches);
         LOG.info("SitemapProcessor: Total new sitemap entries added: {}", newSitemapEntries);
@@ -431,7 +449,7 @@
 
     System.err.println("\t<crawldb>\t\tpath to crawldb where the sitemap urls would be injected");
     System.err.println("\t-hostdb <hostdb>\tpath of a hostdb. Sitemap(s) from these hosts would be downloaded");
-    System.err.println("\t-sitemapUrls <url_dir>\tpath to sitemap urls directory");
+    System.err.println("\t-sitemapUrls <url_dir>\tpath to directory with sitemap urls or hostnames");
     System.err.println("\t-threads <threads>\tNumber of threads created per mapper to fetch sitemap urls (default: 8)");
     System.err.println("\t-force\t\t\tforce update even if CrawlDb appears to be locked (CAUTION advised)");
     System.err.println("\t-noStrict\t\tBy default Sitemap parser rejects invalid urls. '-noStrict' disables that.");
diff --git a/src/plugin/parse-tika/build-ivy.xml b/src/plugin/parse-tika/build-ivy.xml
index a8a0fe9..738f041 100644
--- a/src/plugin/parse-tika/build-ivy.xml
+++ b/src/plugin/parse-tika/build-ivy.xml
@@ -25,13 +25,6 @@
     <property name="ivy.checksums" value="" />
     <property name="ivy.jar.dir" value="${ivy.home}/lib" />
     <property name="ivy.jar.file" value="${ivy.jar.dir}/ivy-${ivy.install.version}.jar" />
-    <!-- define packaging.type=jar to work around the failing dependency download of
-           javax.ws.rs-api.jar
-         required by Tika (1.19 and higher), cf.
-           https://github.com/eclipse-ee4j/jaxrs-api/issues/572
-           https://github.com/jax-rs/api/pull/576
-    -->
-    <property name="packaging.type" value="jar"/>
 
     <target name="download-ivy" unless="offline">
 
diff --git a/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java b/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java
index d7d4cdf..b84fdc0 100644
--- a/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java
+++ b/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java
@@ -164,9 +164,14 @@
     BufferedSource source = responseBody.source();
     int bytesRequested = 0;
     int bufferGrowStepBytes = 8192;
-    while (source.buffer().size() < maxContentBytes) {
+    while (source.buffer().size() <= maxContentBytes) {
       bytesRequested += Math.min(bufferGrowStepBytes,
-          (maxContentBytes - bytesRequested));
+          /*
+           * request one byte more than required to reliably detect truncated
+           * content, but beware of integer overflows
+           */
+          (maxContentBytes == Integer.MAX_VALUE ? maxContentBytes
+              : (1 + maxContentBytes)) - bytesRequested);
       boolean success = false;
       try {
         success = source.request(bytesRequested);
@@ -174,6 +179,8 @@
         if (partialAsTruncated && source.buffer().size() > 0) {
           // treat already fetched content as truncated
           truncated.setReason(TruncatedContentReason.DISCONNECT);
+          LOG.info("Truncated content for {}, partial fetch caused by:", url,
+              e);
         } else {
           throw e;
         }
@@ -191,7 +198,7 @@
         truncated.setReason(TruncatedContentReason.TIME);
         break;
       }
-      if (source.buffer().size() > maxContentBytes) {
+      if (source.buffer().size() >= maxContentBytes) {
         LOG.debug("content limit reached");
       }
       // okhttp may fetch more content than requested, forward requested bytes
diff --git a/src/plugin/protocol-okhttp/src/test/org/apache/nutch/protocol/okhttp/TestBadServerResponses.java b/src/plugin/protocol-okhttp/src/test/org/apache/nutch/protocol/okhttp/TestBadServerResponses.java
index bf69893..34c5f6f 100644
--- a/src/plugin/protocol-okhttp/src/test/org/apache/nutch/protocol/okhttp/TestBadServerResponses.java
+++ b/src/plugin/protocol-okhttp/src/test/org/apache/nutch/protocol/okhttp/TestBadServerResponses.java
@@ -21,9 +21,8 @@
 import static org.junit.Assert.assertTrue;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
 import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
 import java.lang.invoke.MethodHandles;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
@@ -33,6 +32,7 @@
 import java.nio.charset.StandardCharsets;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.zip.GZIPOutputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -98,14 +98,14 @@
   }
 
   /**
-   * Starts the test server at a specified port and constant response.
-   * 
-   * @param portno
-   *          Port number.
-   * @param response
-   *          response sent on every request
-   */
-  private void runServer(int port, String response) throws Exception {
+     * Starts the test server at a specified port and constant response.
+     * 
+     * @param portno
+     *          Port number.
+     * @param response
+     *          response sent on every request
+     */
+  private void runServer(int port, byte[] response) throws Exception {
     server = new ServerSocket();
     server.bind(new InetSocketAddress("127.0.0.1", port));
     Pattern requestPattern = Pattern.compile("(?i)^GET\\s+(\\S+)");
@@ -115,9 +115,7 @@
       LOG.info("Connection received");
       try (
           BufferedReader in = new BufferedReader(new InputStreamReader(
-              socket.getInputStream(), StandardCharsets.UTF_8));
-          PrintWriter out = new PrintWriter(new OutputStreamWriter(
-              socket.getOutputStream(), StandardCharsets.UTF_8), true)) {
+              socket.getInputStream(), StandardCharsets.UTF_8))) {
 
         String line;
         while ((line = in.readLine()) != null) {
@@ -129,13 +127,11 @@
           if (m.find()) {
             LOG.info("Requested {}", m.group(1));
             if (!m.group(1).startsWith("/")) {
-              response = "HTTP/1.1 400 Bad request\r\n\r\n";
+              response = "HTTP/1.1 400 Bad request\r\n\r\n".getBytes(StandardCharsets.UTF_8);
             }
           }
         }
-        LOG.info("Response: {}",
-            response.substring(0, Math.min(1024, response.length())));
-        out.print(response);
+        socket.getOutputStream().write(response);
       } catch (Exception e) {
         LOG.warn("Exception in test server:", e);
       }
@@ -143,6 +139,10 @@
   }
 
   private void launchServer(String response) throws InterruptedException {
+    launchServer(response.getBytes(StandardCharsets.UTF_8));
+  }
+
+  private void launchServer(byte[] response) throws InterruptedException {
     Thread serverThread = new Thread(() -> {
       try {
         runServer(port, response);
@@ -171,9 +171,6 @@
     CrawlDatum crawlDatum = new CrawlDatum();
     ProtocolOutput out = http.getProtocolOutput(new Text(url.toString()),
         crawlDatum);
-    if (expectedCode == -1) {
-      System.out.println(out);
-    }
     int httpStatusCode = -1;
     if (crawlDatum.getMetaData().containsKey(Nutch.PROTOCOL_STATUS_CODE_KEY)) {
       httpStatusCode = Integer.parseInt(crawlDatum.getMetaData()
@@ -321,7 +318,8 @@
   /**
    * NUTCH-2562 protocol-http fails to read large chunked HTTP responses,
    * NUTCH-2575 protocol-http does not respect the maximum content-size for
-   * chunked responses
+   * chunked responses. Also test whether truncations of chunked content are
+   * properly marked.
    */
   @Test
   public void testChunkedContent() throws Exception {
@@ -346,6 +344,137 @@
     assertEquals(
         "Chunked content not truncated according to http.content.limit", 65536,
         fetched.getContent().getContent().length);
+    assertNotNull("Content truncation not marked",
+        fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT));
+    assertEquals("Content truncation not marked",
+        Response.TruncatedContentReason.LENGTH.toString().toLowerCase(),
+        fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT_REASON));
+  }
+
+  /**
+   * NUTCH-2729 Check for http.content.limit defined in nutch-site-test.xml:
+   * whether content is truncated to the configured 64 kB and whether it is
+   * properly marked as truncated.
+   */
+  @Test
+  public void testTruncationMarking() throws Exception {
+    setUp();
+    int[] kBs = { 63, 64, 65 };
+    for (int kB : kBs) {
+      StringBuilder response = new StringBuilder();
+      response.append(responseHeader);
+      response.append("Content-Type: text/plain\r\nContent-Length: "
+          + (kB * 1024) + "\r\n\r\n");
+      for (int i = 0; i < kB; i++) {
+        for (int j = 0; j < 16; j++) {
+          // 16 chunks a 64 bytes = 1 kB
+          response.append(
+              "abcdefghijklmnopqurstuvxyz0123456789-ABCDEFGHIJKLMNOPQURSTUVXYZ\n");
+        }
+      }
+      launchServer(response.toString());
+      ProtocolOutput fetched = fetchPage("/", 200);
+      assertEquals("Content not truncated according to http.content.limit",
+          Math.min(kB * 1024, 65536), fetched.getContent().getContent().length);
+      if (kB * 1024 > 65536) {
+        assertNotNull("Content truncation not marked",
+            fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT));
+        assertEquals("Content truncation not marked",
+            Response.TruncatedContentReason.LENGTH.toString().toLowerCase(),
+            fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT_REASON));
+      }
+      server.close(); // need to close server before next loop iteration
+    }
+  }
+
+  /**
+   * NUTCH-2729 Check for http.content.limit defined in nutch-site-test.xml:
+   * whether content is truncated to the configured 64 kB and whether it is
+   * properly marked as truncated.
+   */
+  @Test
+  public void testTruncationMarkingGzip() throws Exception {
+    setUp();
+    int[] kBs = { 63, 64, 65 };
+    for (int kB : kBs) {
+      StringBuilder payload = new StringBuilder();
+      for (int i = 0; i < kB; i++) {
+        for (int j = 0; j < 16; j++) {
+          // 16 chunks a 64 bytes = 1 kB
+          payload.append(
+              "abcdefghijklmnopqurstuvxyz0123456789-ABCDEFGHIJKLMNOPQURSTUVXYZ\n");
+        }
+      }
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      GZIPOutputStream gzip = new GZIPOutputStream(bytes);
+      gzip.write(payload.toString().getBytes(StandardCharsets.UTF_8));
+      gzip.close();
+      StringBuilder responseHead = new StringBuilder();
+      responseHead.append(responseHeader);
+      responseHead.append("Content-Type: text/plain\r\nContent-Length: "
+          + bytes.size() + "\r\nContent-Encoding: gzip\r\n\r\n");
+      ByteArrayOutputStream response = new ByteArrayOutputStream();
+      response.write(responseHead.toString().getBytes(StandardCharsets.UTF_8));
+      response.write(bytes.toByteArray());
+
+      launchServer(response.toByteArray());
+      ProtocolOutput fetched = fetchPage("/", 200);
+      assertEquals("Content not truncated according to http.content.limit",
+          Math.min(kB * 1024, 65536), fetched.getContent().getContent().length);
+      if (kB * 1024 > 65536) {
+        assertNotNull("Content truncation not marked",
+            fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT));
+        assertEquals("Content truncation not marked",
+            Response.TruncatedContentReason.LENGTH.toString().toLowerCase(),
+            fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT_REASON));
+      }
+      server.close(); // need to close server before next loop iteration
+    }
+  }
+
+  /**
+   * Force an exception after all content has been fetched by sending a wrong
+   * `Content-Length` header and check whether the content is stored anyway if
+   * http.partial.truncated == true
+   */
+  @Test
+  public void testPartialContentTruncated() throws Exception {
+    setUp();
+    conf.setBoolean("http.partial.truncated", true);
+    http.setConf(conf);
+    String testContent = "This is a text.";
+    launchServer(
+        responseHeader + "Content-Length: 50000\r\n\r\n" + testContent);
+    ProtocolOutput fetched = fetchPage("/", 200);
+    assertEquals("Content not saved as truncated", testContent,
+        new String(fetched.getContent().getContent(), StandardCharsets.UTF_8));
+    assertNotNull("Content truncation not marked",
+        fetched.getContent().getMetadata().get(Response.TRUNCATED_CONTENT));
+  }
+
+  @Test
+  public void testNoContentLimit() throws Exception {
+    setUp();
+    conf.setInt("http.content.limit", -1);
+    http.setConf(conf);
+    StringBuilder response = new StringBuilder();
+    response.append(responseHeader);
+    // Even 128 kB content shouldn't cause any truncation because
+    // http.content.limit == -1
+    int kB = 128;
+    response.append("Content-Type: text/plain\r\nContent-Length: " + (kB * 1024)
+        + "\r\n\r\n");
+    for (int i = 0; i < kB; i++) {
+      for (int j = 0; j < 16; j++) {
+        // 16 chunks a 64 bytes = 1 kB
+        response.append(
+            "abcdefghijklmnopqurstuvxyz0123456789-ABCDEFGHIJKLMNOPQURSTUVXYZ\n");
+      }
+    }
+    launchServer(response.toString());
+    ProtocolOutput fetched = fetchPage("/", 200);
+    assertEquals("Content truncated although http.content.limit == -1",
+        (kB * 1024), fetched.getContent().getContent().length);
   }
 
 }