Merge pull request #500 from sebastian-nagel/NUTCH-2772-parsefilter-debug

NUTCH-2772 Debugging parse filter to show serialized DOM tree
diff --git a/build.xml b/build.xml
index b54e713..0a1bca0 100644
--- a/build.xml
+++ b/build.xml
@@ -190,7 +190,6 @@
       <packageset dir="${plugins.dir}/indexer-cloudsearch/src/java/" />
       <packageset dir="${plugins.dir}/indexer-csv/src/java"/>
       <packageset dir="${plugins.dir}/indexer-dummy/src/java"/>
-      <packageset dir="${plugins.dir}/indexer-elastic-rest/src/java/"/>
       <packageset dir="${plugins.dir}/indexer-elastic/src/java/" />
       <packageset dir="${plugins.dir}/indexer-kafka/src/java/" />
       <packageset dir="${plugins.dir}/indexer-rabbit/src/java"/>
@@ -700,7 +699,6 @@
       <packageset dir="${plugins.dir}/indexer-cloudsearch/src/java/" />
       <packageset dir="${plugins.dir}/indexer-csv/src/java"/>
       <packageset dir="${plugins.dir}/indexer-dummy/src/java"/>
-      <packageset dir="${plugins.dir}/indexer-elastic-rest/src/java/"/>
       <packageset dir="${plugins.dir}/indexer-elastic/src/java/" />
       <packageset dir="${plugins.dir}/indexer-kafka/src/java/" />
       <packageset dir="${plugins.dir}/indexer-rabbit/src/java"/>
@@ -1102,7 +1100,6 @@
         <source path="${plugins.dir}/indexer-csv/src/java"/>
         <source path="${plugins.dir}/indexer-csv/src/test"/>
         <source path="${plugins.dir}/indexer-dummy/src/java/" />
-        <source path="${plugins.dir}/indexer-elastic-rest/src/java/"/>
         <source path="${plugins.dir}/indexer-elastic/src/java/" />
         <source path="${plugins.dir}/indexer-kafka/src/java/" />
         <source path="${plugins.dir}/indexer-rabbit/src/java/" />
diff --git a/conf/index-writers.xml.template b/conf/index-writers.xml.template
index 96c765e..ad8bb75 100644
--- a/conf/index-writers.xml.template
+++ b/conf/index-writers.xml.template
@@ -108,8 +108,10 @@
     <parameters>
       <param name="host" value="localhost"/>
       <param name="port" value="9200"/>
-      <param name="cluster" value=""/>
       <param name="index" value="nutch"/>
+      <param name="username" value="elastic"/>
+      <param name="password" value=""/>
+      <!--<param name="auth" value="false"/>-->
       <param name="max.bulk.docs" value="250"/>
       <param name="max.bulk.size" value="2500500"/>
       <param name="exponential.backoff.millis" value="100"/>
@@ -125,30 +127,6 @@
       <remove />
     </mapping>
   </writer>
-  <writer id="indexer_elastic_rest_1" class="org.apache.nutch.indexwriter.elasticrest.ElasticRestIndexWriter">
-    <parameters>
-      <param name="host" value=""/>
-      <param name="port" value="9200"/>
-      <param name="index" value="nutch"/>
-      <param name="max.bulk.docs" value="250"/>
-      <param name="max.bulk.size" value="2500500"/>
-      <param name="user" value="user"/>
-      <param name="password" value="password"/>
-      <param name="type" value="doc"/>
-      <param name="https" value="false"/>
-      <param name="trustallhostnames" value="false"/>
-      <param name="languages" value=""/>
-      <param name="separator" value="_"/>
-      <param name="sink" value="others"/>
-    </parameters>
-    <mapping>
-      <copy>
-        <field source="title" dest="search"/>
-      </copy>
-      <rename />
-      <remove />
-    </mapping>
-  </writer>
   <writer id="indexer_cloud_search_1" class="org.apache.nutch.indexwriter.cloudsearch.CloudSearchIndexWriter">
     <parameters>
       <param name="endpoint" value=""/>
diff --git a/conf/log4j.properties b/conf/log4j.properties
index e133301..67311d1 100644
--- a/conf/log4j.properties
+++ b/conf/log4j.properties
@@ -118,9 +118,4 @@
 
 #log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
 #log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
-#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
-
-#
-# Plugin-Specific Loggers
-#
-#log4j.logger.org.apache.nutch.indexwriter.elasticrest.ElasticRestIndexWriter=INFO,cmdstdout
\ No newline at end of file
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
\ No newline at end of file
diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml
index 58db620..6dfbe64 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml
@@ -959,6 +959,18 @@
 </property> 
 
 <property>
+ <name>fetcher.min.crawl.delay</name>
+ <value>${fetcher.server.delay}</value>
+ <description>
+ Minimum Crawl-Delay (in seconds) accepted in robots.txt, even if the
+ robots.txt specifies a shorter delay. By default the minimum Crawl-Delay
+ is set to the value of `fetcher.server.delay` which guarantees that
+ a value set in the robots.txt cannot make the crawler more aggressive
+ than the default configuration.
+ </description>
+</property>
+
+<property>
   <name>fetcher.threads.fetch</name>
   <value>10</value>
   <description>The number of FetcherThreads the fetcher should use.
@@ -1209,6 +1221,19 @@
   </description>
 </property>
 
+<!-- SegmentReader -->
+<property>
+  <name>segment.reader.content.recode</name>
+  <value>false</value>
+  <description>
+    SegmentReader when dumping segments: If true try to recode content
+    of HTML documents from the original encoding to UTF-8. Note, this
+    property can be overwritten by SegmentReader command-line options.
+  </description>
+</property>
+
+
+
 <!--  any23 plugin properties -->
 
 <property>
diff --git a/default.properties b/default.properties
index 1537a01..e96c555 100644
--- a/default.properties
+++ b/default.properties
@@ -198,7 +198,6 @@
    org.apache.nutch.indexwriter.csv*:\
    org.apache.nutch.indexwriter.dummy*:\
    org.apache.nutch.indexwriter.elastic*:\
-   org.apache.nutch.indexwriter.elasticrest*:\
    org.apache.nutch.indexwriter.rabbit*:\
    org.apache.nutch.indexwriter.kafka*:\
    org.apache.nutch.indexwriter.solr*
diff --git a/ivy/ivy.xml b/ivy/ivy.xml
index 64ba582..4686c78 100644
--- a/ivy/ivy.xml
+++ b/ivy/ivy.xml
@@ -50,7 +50,7 @@
 		<dependency org="com.tdunning" name="t-digest" rev="3.2" />
 
 		<!-- Hadoop Dependencies -->
-		<dependency org="org.apache.hadoop" name="hadoop-common" rev="2.9.2" conf="*->default">
+		<dependency org="org.apache.hadoop" name="hadoop-common" rev="3.1.3" conf="*->default">
 			<exclude org="hsqldb" name="hsqldb" />
 			<exclude org="net.sf.kosmosfs" name="kfs" />
 			<exclude org="net.java.dev.jets3t" name="jets3t" />
@@ -58,12 +58,12 @@
 			<exclude org="org.mortbay.jetty" name="jsp-*" />
 			<exclude org="ant" name="ant" />
 		</dependency>
-		<dependency org="org.apache.hadoop" name="hadoop-hdfs" rev="2.9.2" conf="*->default"/>
-		<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-core" rev="2.9.2" conf="*->default"/>
-		<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-jobclient" rev="2.9.2" conf="*->default"/>
+		<dependency org="org.apache.hadoop" name="hadoop-hdfs" rev="3.1.3" conf="*->default"/>
+		<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-core" rev="3.1.3" conf="*->default"/>
+		<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-jobclient" rev="3.1.3" conf="*->default"/>
 		<!-- End of Hadoop Dependencies -->
 
-		<dependency org="org.apache.tika" name="tika-core" rev="1.22" />
+		<dependency org="org.apache.tika" name="tika-core" rev="1.24.1" />
 
 		<dependency org="xml-apis" name="xml-apis" rev="1.4.01"/><!-- force this version as it is required by Tika -->
 		<dependency org="xerces" name="xercesImpl" rev="2.12.0" />
diff --git a/src/bin/crawl b/src/bin/crawl
index 56bb237..331ee65 100755
--- a/src/bin/crawl
+++ b/src/bin/crawl
@@ -23,7 +23,13 @@
 #
 # Options:
 #   -i|--index                            Indexes crawl results into a configured indexer
-#   -D                                    A Java property to pass to Nutch calls
+#   -D <propery>=<value>                  A Nutch or Hadoop property to pass to Nutch calls overwriting
+#                                         properties defined in configuration files, e.g.
+#                                           increase content limit to 2MB:
+#                                             -D http.content.limit=2097152
+#                                         (in distributed mode) configure memory of map and reduce tasks:
+#                                           -D mapreduce.map.memory.mb=4608    -D mapreduce.map.java.opts=-Xmx4096m
+#                                           -D mapreduce.reduce.memory.mb=4608 -D mapreduce.reduce.java.opts=-Xmx4096m
 #   -w|--wait <NUMBER[SUFFIX]>            Time to wait before generating a new segment when no URLs
 #                                         are scheduled for fetching. Suffix can be: s for second,
 #                                         m for minute, h for hour and d for day. If no suffix is
@@ -42,9 +48,6 @@
 #   --time-limit-fetch <time_limit_fetch> Number of minutes allocated to the fetching [default: 180]
 #   --num-threads <num_threads>           Number of threads for fetching / sitemap processing [default: 50]
 #
-#
-# UNLIKE THE NUTCH ALL-IN-ONE-CRAWL COMMAND THIS SCRIPT DOES THE LINK INVERSION AND
-# INDEXING FOR EACH SEGMENT
 
 function __to_seconds() {
   NUMBER=$(echo $1 | tr -dc '0-9')
@@ -77,7 +80,13 @@
   echo -e ""
   echo -e "Options:"
   echo -e "  -i|--index\t\t\t\tIndexes crawl results into a configured indexer"
-  echo -e "  -D\t\t\t\t\tA Java property to pass to Nutch calls"
+  echo -e "  -D\t\t\t\t\tA Nutch or Hadoop property to pass to Nutch calls overwriting"
+  echo -e "  \t\t\t\t\tproperties defined in configuration files, e.g."
+  echo -e "  \t\t\t\t\tincrease content limit to 2MB:"
+  echo -e "  \t\t\t\t\t  -D http.content.limit=2097152"
+  echo -e "  \t\t\t\t\t(distributed mode only) configure memory of map and reduce tasks:"
+  echo -e "  \t\t\t\t\t  -D mapreduce.map.memory.mb=4608    -D mapreduce.map.java.opts=-Xmx4096m"
+  echo -e "  \t\t\t\t\t  -D mapreduce.reduce.memory.mb=4608 -D mapreduce.reduce.java.opts=-Xmx4096m"
   echo -e "  -w|--wait <NUMBER[SUFFIX]>\t\tTime to wait before generating a new segment when no URLs"
   echo -e "  \t\t\t\t\tare scheduled for fetching. Suffix can be: s for second,"
   echo -e "  \t\t\t\t\tm for minute, h for hour and d for day. If no suffix is"
@@ -106,7 +115,7 @@
 INDEXFLAG=false
 HOSTDBUPDATE=false
 HOSTDBGENERATE=false
-JAVA_PROPERTIES=""
+HADOOP_PROPERTIES=()
 WAIT=-1 # don't wait if there are no URLs to fetch
 SEEDDIR=""
 NUM_FETCHERS=1
@@ -124,7 +133,7 @@
             shift
             ;;
         -D)
-            JAVA_PROPERTIES="-D${2} ${JAVA_PROPERTIES}"
+            HADOOP_PROPERTIES=("${HADOOP_PROPERTIES[@]}" -D"${2}")
             shift 2
             ;;
         -s)
@@ -218,7 +227,7 @@
 
 # note that some of the options listed here could be set in the
 # corresponding hadoop site xml param file
-commonOptions="-D mapreduce.job.reduces=$NUM_TASKS -D mapred.child.java.opts=-Xmx1000m -D mapreduce.reduce.speculative=false -D mapreduce.map.speculative=false -D mapreduce.map.output.compress=true"
+commonOptions=("${HADOOP_PROPERTIES[@]}" -Dmapreduce.job.reduces=$NUM_TASKS -Dmapreduce.reduce.speculative=false -Dmapreduce.map.speculative=false -Dmapreduce.map.output.compress=true)
 
  # check that hadoop can be found on the path
 if [ $mode = "distributed" ]; then
@@ -259,20 +268,20 @@
 function __update_hostdb {
   if __directory_exists "$CRAWL_PATH"/crawldb; then
     echo "Updating HostDB"
-    __bin_nutch updatehostdb -crawldb "$CRAWL_PATH"/crawldb -hostdb "$CRAWL_PATH"/hostdb
+    __bin_nutch updatehostdb "${commonOptions[@]}" -crawldb "$CRAWL_PATH"/crawldb -hostdb "$CRAWL_PATH"/hostdb
   fi
 }
 
 # initial injection
 if [[ ! -z $SEEDDIR  ]]; then
   echo "Injecting seed URLs"
-  __bin_nutch inject "$CRAWL_PATH"/crawldb "$SEEDDIR"
+  __bin_nutch inject "${commonOptions[@]}" "$CRAWL_PATH"/crawldb "$SEEDDIR"
 fi
 
 # sitemap processing based on sitemap definition file(s)
 if [[ ! -z $SITEMAPDIR ]]; then
   echo "Processing sitemaps defined in $SITEMAPDIR"
-  __bin_nutch sitemap "$CRAWL_PATH/crawldb" -sitemapUrls "$SITEMAPDIR" -threads $NUM_THREADS
+  __bin_nutch sitemap "${commonOptions[@]}" "$CRAWL_PATH/crawldb" -sitemapUrls "$SITEMAPDIR" -threads $NUM_THREADS
 fi
 
 # main loop : rounds of generate - fetch - parse - update
@@ -300,15 +309,15 @@
     # sitemap processing based on HostDB
     if __directory_exists "$CRAWL_PATH"/hostdb; then
       echo "Processing sitemaps based on hosts in HostDB"
-      __bin_nutch sitemap "$CRAWL_PATH"/crawldb -hostdb "$CRAWL_PATH"/hostdb -threads $NUM_THREADS
+      __bin_nutch sitemap "${commonOptions[@]}" "$CRAWL_PATH"/crawldb -hostdb "$CRAWL_PATH"/hostdb -threads $NUM_THREADS
     fi
   fi
 
   echo "Generating a new segment"
   if [[ "$HOSTDBGENERATE" == "true" ]] && __directory_exists "$CRAWL_PATH"/hostdb; then
-   generate_args=($commonOptions "$CRAWL_PATH"/crawldb "$CRAWL_PATH"/segments -topN $SIZE_FETCHLIST -numFetchers $NUM_FETCHERS -noFilter -hostdb "$CRAWL_PATH"/hostdb)
+   generate_args=("${commonOptions[@]}" "$CRAWL_PATH"/crawldb "$CRAWL_PATH"/segments -topN $SIZE_FETCHLIST -numFetchers $NUM_FETCHERS -noFilter -hostdb "$CRAWL_PATH"/hostdb)
   else
-   generate_args=($commonOptions "$CRAWL_PATH"/crawldb "$CRAWL_PATH"/segments -topN $SIZE_FETCHLIST -numFetchers $NUM_FETCHERS -noFilter)
+   generate_args=("${commonOptions[@]}" "$CRAWL_PATH"/crawldb "$CRAWL_PATH"/segments -topN $SIZE_FETCHLIST -numFetchers $NUM_FETCHERS -noFilter)
   fi
 
   echo "$bin/nutch generate ${generate_args[@]}"
@@ -348,33 +357,33 @@
 
   # fetching the segment
   echo "Fetching : $SEGMENT"
-  __bin_nutch fetch $commonOptions -D fetcher.timelimit.mins=$TIME_LIMIT_FETCH "$CRAWL_PATH"/segments/$SEGMENT -threads $NUM_THREADS
+  __bin_nutch fetch "${commonOptions[@]}" -D fetcher.timelimit.mins=$TIME_LIMIT_FETCH "$CRAWL_PATH"/segments/$SEGMENT -threads $NUM_THREADS
 
   # parsing the segment
   echo "Parsing : $SEGMENT"
   # enable the skipping of records for the parsing so that a dodgy document
   # so that it does not fail the full task
   skipRecordsOptions="-D mapreduce.task.skip.start.attempts=2 -D mapreduce.map.skip.maxrecords=1"
-  __bin_nutch parse $commonOptions $skipRecordsOptions "$CRAWL_PATH"/segments/$SEGMENT
+  __bin_nutch parse "${commonOptions[@]}" $skipRecordsOptions "$CRAWL_PATH"/segments/$SEGMENT
 
   # updatedb with this segment
   echo "CrawlDB update"
-  __bin_nutch updatedb $commonOptions "$CRAWL_PATH"/crawldb  "$CRAWL_PATH"/segments/$SEGMENT
+  __bin_nutch updatedb "${commonOptions[@]}" "$CRAWL_PATH"/crawldb  "$CRAWL_PATH"/segments/$SEGMENT
 
 # note that the link inversion - indexing routine can be done within the main loop
 # on a per segment basis
   echo "Link inversion"
-  __bin_nutch invertlinks "$CRAWL_PATH"/linkdb "$CRAWL_PATH"/segments/$SEGMENT
+  __bin_nutch invertlinks "${commonOptions[@]}" "$CRAWL_PATH"/linkdb "$CRAWL_PATH"/segments/$SEGMENT
 
   echo "Dedup on crawldb"
-  __bin_nutch dedup "$CRAWL_PATH"/crawldb
+  __bin_nutch dedup "${commonOptions[@]}" "$CRAWL_PATH"/crawldb
 
   if $INDEXFLAG; then
       echo "Indexing $SEGMENT to index"
-      __bin_nutch index $JAVA_PROPERTIES "$CRAWL_PATH"/crawldb -linkdb "$CRAWL_PATH"/linkdb "$CRAWL_PATH"/segments/$SEGMENT
+      __bin_nutch index "${commonOptions[@]}" "$CRAWL_PATH"/crawldb -linkdb "$CRAWL_PATH"/linkdb "$CRAWL_PATH"/segments/$SEGMENT
 
       echo "Cleaning up index if possible"
-      __bin_nutch clean $JAVA_PROPERTIES "$CRAWL_PATH"/crawldb
+      __bin_nutch clean "${commonOptions[@]}" "$CRAWL_PATH"/crawldb
   else
       echo "Skipping indexing ..."
   fi
@@ -389,19 +398,19 @@
   # and should be uncommented based on your requirements
   #######################################################
   #echo "Building WebGraph within $CRAWL_PATH on all segments in $CRAWL_PATH/segments/"
-  #__bin_nutch webgraph $commonOptions -filter -normalize -segmentDir "$CRAWL_PATH"/segments/ -webgraphdb "$CRAWL_PATH"
+  #__bin_nutch webgraph "${commonOptions[@]}" -filter -normalize -segmentDir "$CRAWL_PATH"/segments/ -webgraphdb "$CRAWL_PATH"
 
   #echo "Running Loops Job on WebGraph within $CRAWL_PATH"
-  #__bin_nutch org.apache.nutch.scoring.webgraph.Loops $commonOptions -webgraphdb "$CRAWL_PATH"
+  #__bin_nutch org.apache.nutch.scoring.webgraph.Loops "${commonOptions[@]}" -webgraphdb "$CRAWL_PATH"
 
   #echo "Running LinkRank Algorithm on WebGraph within $CRAWL_PATH"
-  #__bin_nutch linkrank $commonOptions -webgraphdb "$CRAWL_PATH"
+  #__bin_nutch linkrank "${commonOptions[@]}" -webgraphdb "$CRAWL_PATH"
 
   #echo "Running ScoreUpdater Job with $CRAWL_PATH/crawldb and  WebGraph within $CRAWL_PATH"
-  #__bin_nutch scoreupdater $commonOptions -crawldb "$CRAWL_PATH"/crawldb -webgraphdb "$CRAWL_PATH"
+  #__bin_nutch scoreupdater "${commonOptions[@]}" -crawldb "$CRAWL_PATH"/crawldb -webgraphdb "$CRAWL_PATH"
 
   #echo "Running NodeDumper on WebGraph within $CRAWL_PATH and dumping output to $CRAWL_PATH/dump/scores"
-  #__bin_nutch nodedumper $commonOptions -scores -topn 1000 -webgraphdb "$CRAWL_PATH" -output "$CRAWL_PATH"/dump/scores
+  #__bin_nutch nodedumper "${commonOptions[@]}" -scores -topn 1000 -webgraphdb "$CRAWL_PATH" -output "$CRAWL_PATH"/dump/scores
 
 done
 
diff --git a/src/bin/nutch b/src/bin/nutch
index 2b3d2a0..3a25738 100755
--- a/src/bin/nutch
+++ b/src/bin/nutch
@@ -17,12 +17,12 @@
 # 
 # The Nutch command script
 #
-# Environment Variables
+# Environment Variables (local mode only)
 #
 #   NUTCH_JAVA_HOME The java implementation to use.  Overrides JAVA_HOME.
 #
 #   NUTCH_HEAPSIZE  The maximum amount of heap to use, in MB. 
-#                   Default is 1000.
+#                   Default is 4096.
 #
 #   NUTCH_OPTS      Extra Java runtime options.
 #                   Multiple options must be separated by white space.
@@ -34,6 +34,13 @@
 #   NUTCH_CONF_DIR  Path(s) to configuration files (default: $NUTCH_HOME/conf).
 #                   Multiple paths must be separated by a colon ':'.
 #
+# Note: environment variables are only used in local mode. When running Nutch
+#       on a Hadoop cluster (distributed mode), the corresponding settings
+#       are configured by Hadoop configuration properties set globally for the
+#       cluster or per Nutch job. For the complete list of properties, see
+#         https://hadoop.apache.org/docs/stable3/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml
+#         https://hadoop.apache.org/docs/stable3/hadoop-yarn/hadoop-yarn-common/yarn-default.xml
+#
 cygwin=false
 case "`uname`" in
 CYGWIN*) cygwin=true;;
@@ -54,7 +61,7 @@
 # if no args specified, show usage
 if [ $# = 0 ]; then
   echo "nutch 1.17-SNAPSHOT"
-  echo "Usage: nutch COMMAND"
+  echo "Usage: nutch COMMAND [-Dproperty=value]... [command-specific args]..."
   echo "where COMMAND is one of:"
   echo "  readdb            read / dump crawl db"
   echo "  mergedb           merge crawldb-s, with optional filtering"
@@ -136,7 +143,7 @@
 fi
 
 JAVA="$JAVA_HOME/bin/java"
-JAVA_HEAP_MAX=-Xmx1000m 
+JAVA_HEAP_MAX=-Xmx4096m
 
 # check envvars which might override default args
 if [ "$NUTCH_HEAPSIZE" != "" ]; then
diff --git a/src/java/org/apache/nutch/crawl/CrawlDb.java b/src/java/org/apache/nutch/crawl/CrawlDb.java
index 8cd5e3e..5d91b0a 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDb.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDb.java
@@ -98,15 +98,13 @@
 
     boolean url404Purging = conf.getBoolean(CRAWLDB_PURGE_404, false);
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("CrawlDb update: starting at " + sdf.format(start));
-      LOG.info("CrawlDb update: db: " + crawlDb);
-      LOG.info("CrawlDb update: segments: " + Arrays.asList(segments));
-      LOG.info("CrawlDb update: additions allowed: " + additionsAllowed);
-      LOG.info("CrawlDb update: URL normalizing: " + normalize);
-      LOG.info("CrawlDb update: URL filtering: " + filter);
-      LOG.info("CrawlDb update: 404 purging: " + url404Purging);
-    }
+    LOG.info("CrawlDb update: starting at {}", sdf.format(start));
+    LOG.info("CrawlDb update: db: {}", crawlDb);
+    LOG.info("CrawlDb update: segments: {}", Arrays.asList(segments));
+    LOG.info("CrawlDb update: additions allowed: {}", additionsAllowed);
+    LOG.info("CrawlDb update: URL normalizing: {}", normalize);
+    LOG.info("CrawlDb update: URL filtering: {}", filter);
+    LOG.info("CrawlDb update: 404 purging: {}", url404Purging);
 
     for (int i = 0; i < segments.length; i++) {
       FileSystem sfs = segments[i].getFileSystem(getConf());
@@ -117,16 +115,14 @@
         if (sfs.exists(parse)) {
           FileInputFormat.addInputPath(job, parse);
         } else {
-          LOG.info(" - adding fetched but unparsed segment " + segments[i]);
+          LOG.info(" - adding fetched but unparsed segment {}", segments[i]);
         }
       } else {
-        LOG.info(" - skipping invalid segment " + segments[i]);
+        LOG.info(" - skipping invalid segment {}", segments[i]);
       }
     }
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("CrawlDb update: Merging segment data into db.");
-    }
+    LOG.info("CrawlDb update: Merging segment data into db.");
 
     FileSystem fs = crawlDb.getFileSystem(getConf());
     Path outPath = FileOutputFormat.getOutputPath(job);
@@ -229,6 +225,7 @@
     System.exit(res);
   }
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length < 1) {
       System.err
@@ -280,7 +277,7 @@
           filter, additionsAllowed, force);
       return 0;
     } catch (Exception e) {
-      LOG.error("CrawlDb update: " + StringUtils.stringifyException(e));
+      LOG.error("CrawlDb update: ", e);
       return -1;
     }
   }
diff --git a/src/java/org/apache/nutch/crawl/CrawlDbFilter.java b/src/java/org/apache/nutch/crawl/CrawlDbFilter.java
index 5da9951..8abe4b4 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbFilter.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbFilter.java
@@ -52,7 +52,7 @@
   private static final Logger LOG = LoggerFactory
       .getLogger(MethodHandles.lookup().lookupClass());
 
-
+  @Override
   public void setup(Mapper<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
     Configuration conf = context.getConfiguration();
     urlFiltering = conf.getBoolean(URL_FILTERING, false);
@@ -69,11 +69,9 @@
     }
   }
 
-  public void close() {
-  }
-
   private Text newKey = new Text();
 
+  @Override
   public void map(Text key, CrawlDatum value,
       Context context) throws IOException, InterruptedException {
 
diff --git a/src/java/org/apache/nutch/crawl/CrawlDbMerger.java b/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
index 7c6ef93..6cf2809 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
@@ -71,15 +71,14 @@
       Reducer<Text, CrawlDatum, Text, CrawlDatum> {
     private FetchSchedule schedule;
 
-    public void close() throws IOException {
-    }
-
+    @Override
     public void setup(
         Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
       Configuration conf = context.getConfiguration();
       schedule = FetchScheduleFactory.getFetchSchedule(conf);
     }
 
+    @Override
     public void reduce(Text key, Iterable<CrawlDatum> values,
         Context context)
         throws IOException, InterruptedException {
@@ -120,7 +119,6 @@
   }
 
   public CrawlDbMerger() {
-
   }
 
   public CrawlDbMerger(Configuration conf) {
@@ -133,13 +131,11 @@
 
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     long start = System.currentTimeMillis();
-    LOG.info("CrawlDb merge: starting at " + sdf.format(start));
+    LOG.info("CrawlDb merge: starting at {}", sdf.format(start));
 
     Job job = createMergeJob(getConf(), output, normalize, filter);
     for (int i = 0; i < dbs.length; i++) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Adding " + dbs[i]);
-      }
+      LOG.info("Adding {}", dbs[i]);
       FileInputFormat.addInputPath(job, new Path(dbs[i], CrawlDb.CURRENT_NAME));
     }
 
@@ -200,6 +196,7 @@
     System.exit(res);
   }
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
       System.err
diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
index b9200e7..dfcc87a 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
@@ -167,6 +167,7 @@
         }
       }
 
+      @Override
       public synchronized void write(Text key, CrawlDatum value)
           throws IOException {
         out.writeByte('"');
@@ -212,12 +213,14 @@
         out.writeByte('\n');
       }
 
+      @Override
       public synchronized void close(TaskAttemptContext context)
           throws IOException {
         out.close();
       }
     }
 
+    @Override
     public RecordWriter<Text, CrawlDatum> getRecordWriter(
         TaskAttemptContext context) throws IOException {
       String name = getUniqueFile(context, "part", "");
@@ -243,6 +246,7 @@
         jsonWriter = jsonMapper.writer(new JsonIndenter());
       }
 
+      @Override
       public synchronized void write(Text key, CrawlDatum value)
           throws IOException {
         Map<String, Object> data = new LinkedHashMap<String, Object>();
@@ -275,12 +279,14 @@
         out.writeByte('\n');
       }
 
+      @Override
       public synchronized void close(TaskAttemptContext context)
           throws IOException {
         out.close();
       }
     }
 
+    @Override
     public RecordWriter<Text, CrawlDatum> getRecordWriter(
         TaskAttemptContext context) throws IOException {
       String name = getUniqueFile(context, "part", "");
@@ -343,6 +349,8 @@
 
   public static class CrawlDbStatReducer
       extends Reducer<Text, NutchWritable, Text, NutchWritable> {
+
+    @Override
     public void setup(
         Reducer<Text, NutchWritable, Text, NutchWritable>.Context context) {
     }
@@ -474,6 +482,7 @@
     }
   }
 
+  @Override
   public void close() {
     closeReaders();
   }
@@ -650,14 +659,12 @@
       }
     }
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("CrawlDb statistics start: " + crawlDb);
-    }
+    LOG.info("CrawlDb statistics start: {}", crawlDb);
     TreeMap<String, Writable> stats = processStatJobHelper(crawlDb, config,
         sort);
 
     if (LOG.isInfoEnabled()) {
-      LOG.info("Statistics for CrawlDb: " + crawlDb);
+      LOG.info("Statistics for CrawlDb: {}", crawlDb);
       LongWritable totalCnt = new LongWritable(0);
       if (stats.containsKey("T")) {
         totalCnt = ((LongWritable) stats.get("T"));
@@ -720,10 +727,7 @@
         }
       }
     }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("CrawlDb statistics: done");
-    }
-
+    LOG.info("CrawlDb statistics: done");
   }
 
   public CrawlDatum get(String crawlDb, String url, Configuration config)
@@ -760,10 +764,8 @@
       Configuration config, String format, String regex, String status,
       Integer retry, String expr, Float sample)
       throws IOException, ClassNotFoundException, InterruptedException {
-    if (LOG.isInfoEnabled()) {
-      LOG.info("CrawlDb dump: starting");
-      LOG.info("CrawlDb db: " + crawlDb);
-    }
+    LOG.info("CrawlDb dump: starting");
+    LOG.info("CrawlDb db: {}", crawlDb);
 
     Path outFolder = new Path(output);
 
@@ -793,7 +795,7 @@
       jobConf.setInt("retry", retry);
     if (expr != null) {
       jobConf.set("expr", expr);
-      LOG.info("CrawlDb db: expr: " + expr);
+      LOG.info("CrawlDb db: expr: {}", expr);
     }
     if (sample != null) {
       jobConf.setFloat("sample", sample);
@@ -817,9 +819,7 @@
       throw e;
     }
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("CrawlDb dump: done");
-    }
+    LOG.info("CrawlDb dump: done");
   }
 
   public static class CrawlDbDumpMapper
@@ -892,7 +892,7 @@
 
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")");
-      LOG.info("CrawlDb db: " + crawlDb);
+      LOG.info("CrawlDb db: {}", crawlDb);
     }
 
     Path outFolder = new Path(output);
@@ -933,9 +933,7 @@
       throw e;
     }
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("CrawlDb topN: collecting topN scores.");
-    }
+    LOG.info("CrawlDb topN: collecting topN scores.");
     job = NutchJob.getInstance(config);
     job.setJobName("topN collect " + crawlDb);
     job.getConfiguration().setLong("db.reader.topn", topN);
@@ -970,12 +968,10 @@
     }
 
     fs.delete(tempDir, true);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("CrawlDb topN: done");
-    }
-
+    LOG.info("CrawlDb topN: done");
   }
 
+  @Override
   public int run(String[] args) throws IOException, InterruptedException,
       ClassNotFoundException, Exception {
     @SuppressWarnings("resource")
diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReducer.java b/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
index feba08a..bfc62c3 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
@@ -49,6 +49,7 @@
   private int maxInterval;
   private FetchSchedule schedule;
 
+  @Override
   public void setup(Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
     Configuration conf = context.getConfiguration();
     retryMax = conf.getInt("db.fetch.retry.max", 3);
@@ -60,9 +61,7 @@
     linked = new InlinkPriorityQueue(maxLinks);
   }
 
-  public void close() {
-  }
-
+  @Override
   public void reduce(Text key, Iterable<CrawlDatum> values,
       Context context) throws IOException, InterruptedException {
 
@@ -161,16 +160,14 @@
         try {
           scfilters.orphanedScore(key, old);
         } catch (ScoringFilterException e) {
-          if (LOG.isWarnEnabled()) {
-            LOG.warn("Couldn't update orphaned score, key={}: {}", key, e);
-          }
+          LOG.warn("Couldn't update orphaned score, key={}: {}", key, e);
         }
         context.write(key, old);
         context.getCounter("CrawlDB status",
             CrawlDatum.getStatusName(old.getStatus())).increment(1);
       } else {
-        LOG.warn("Missing fetch and old value, signature="
-            + StringUtil.toHexString(signature));
+        LOG.warn("Missing fetch and old value, signature={}",
+            StringUtil.toHexString(signature));
       }
       return;
     }
@@ -207,10 +204,8 @@
         try {
           scfilters.initialScore(key, result);
         } catch (ScoringFilterException e) {
-          if (LOG.isWarnEnabled()) {
-            LOG.warn("Cannot filter init score for url " + key
-                + ", using default: " + e.getMessage());
-          }
+          LOG.warn("Cannot filter init score for url {}, using default: {}",
+              key, e.getMessage());
           result.setScore(0.0f);
         }
       }
@@ -286,9 +281,7 @@
         result = schedule.forceRefetch(key, result, false);
       break;
     case CrawlDatum.STATUS_SIGNATURE:
-      if (LOG.isWarnEnabled()) {
-        LOG.warn("Lone CrawlDatum.STATUS_SIGNATURE: " + key);
-      }
+      LOG.warn("Lone CrawlDatum.STATUS_SIGNATURE: {}", key);
       return;
     case CrawlDatum.STATUS_FETCH_RETRY: // temporary failure
       if (oldSet) {
@@ -321,9 +314,7 @@
     try {
       scfilters.updateDbScore(key, oldSet ? old : null, result, linkList);
     } catch (Exception e) {
-      if (LOG.isWarnEnabled()) {
-        LOG.warn("Couldn't update score, key={}: {}", key, e);
-      }
+      LOG.warn("Couldn't update score, key={}: {}", key, e);
     }
     // remove generation time, if any
     result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY);
diff --git a/src/java/org/apache/nutch/crawl/DeduplicationJob.java b/src/java/org/apache/nutch/crawl/DeduplicationJob.java
index 9b01411..7751366 100644
--- a/src/java/org/apache/nutch/crawl/DeduplicationJob.java
+++ b/src/java/org/apache/nutch/crawl/DeduplicationJob.java
@@ -266,6 +266,7 @@
     }
   }
 
+  @Override
   public int run(String[] args) throws IOException {
     if (args.length < 1) {
       System.err.println("Usage: DeduplicationJob <crawldb> [-group <none|host|domain>] [-compareOrder <score>,<fetchTime>,<httpsOverHttp>,<urlLength>]");
@@ -345,9 +346,7 @@
     }
 
     // merge with existing crawl db
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Deduplication: Updating status of duplicate urls into crawl db.");
-    }
+    LOG.info("Deduplication: Updating status of duplicate urls into crawl db.");
 
     Job mergeJob = CrawlDb.createJob(getConf(), crawlDb);
     FileInputFormat.addInputPath(mergeJob, tempDir);
diff --git a/src/java/org/apache/nutch/crawl/Generator.java b/src/java/org/apache/nutch/crawl/Generator.java
index 1c5e4d5..5dcd2ea 100644
--- a/src/java/org/apache/nutch/crawl/Generator.java
+++ b/src/java/org/apache/nutch/crawl/Generator.java
@@ -578,6 +578,7 @@
   public static class SelectorInverseMapper
       extends Mapper<FloatWritable, SelectorEntry, Text, SelectorEntry> {
 
+    @Override
     public void map(FloatWritable key, SelectorEntry value, Context context)
         throws IOException, InterruptedException {
       SelectorEntry entry = value;
@@ -588,6 +589,7 @@
   public static class PartitionReducer
       extends Reducer<Text, SelectorEntry, Text, CrawlDatum> {
 
+    @Override
     public void reduce(Text key, Iterable<SelectorEntry> values,
         Context context) throws IOException, InterruptedException {
       // if using HashComparator, we get only one input key in case of
@@ -606,6 +608,7 @@
     }
 
     @SuppressWarnings("rawtypes")
+    @Override
     public int compare(WritableComparable a, WritableComparable b) {
       Text url1 = (Text) a;
       Text url2 = (Text) b;
@@ -614,6 +617,7 @@
       return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1));
     }
 
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
       int hash1 = hash(b1, s1, l1);
       int hash2 = hash(b2, s2, l2);
@@ -923,9 +927,8 @@
   private Path partitionSegment(Path segmentsDir, Path inputDir, int numLists)
       throws IOException, ClassNotFoundException, InterruptedException {
     // invert again, partition by host/domain/IP, sort by url hash
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Generator: Partitioning selected urls for politeness.");
-    }
+    LOG.info("Generator: Partitioning selected urls for politeness.");
+
     Path segment = new Path(segmentsDir, generateSegmentName());
     Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME);
 
@@ -989,6 +992,7 @@
     System.exit(res);
   }
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
       System.out.println(
diff --git a/src/java/org/apache/nutch/crawl/Injector.java b/src/java/org/apache/nutch/crawl/Injector.java
index 7d4ee84..84dc812 100644
--- a/src/java/org/apache/nutch/crawl/Injector.java
+++ b/src/java/org/apache/nutch/crawl/Injector.java
@@ -128,6 +128,7 @@
     private String scope;
     private boolean filterNormalizeAll = false;
 
+    @Override
     public void setup(Context context) {
       Configuration conf = context.getConfiguration();
       boolean normalize = conf.getBoolean(CrawlDbFilter.URL_NORMALIZING, true);
@@ -205,6 +206,7 @@
       }
     }
 
+    @Override
     public void map(Text key, Writable value, Context context)
         throws IOException, InterruptedException {
       if (value instanceof Text) {
@@ -233,10 +235,9 @@
             key.set(url);
             scfilters.injectedScore(key, datum);
           } catch (ScoringFilterException e) {
-            if (LOG.isWarnEnabled()) {
-              LOG.warn("Cannot filter injected score for url " + url
-                  + ", using default (" + e.getMessage() + ")");
-            }
+            LOG.warn(
+                "Cannot filter injected score for url {}, using default ({})",
+                url, e.getMessage());
           }
           context.getCounter("injector", "urls_injected").increment(1);
           context.write(key, datum);
@@ -277,6 +278,7 @@
     private CrawlDatum old = new CrawlDatum();
     private CrawlDatum injected = new CrawlDatum();
 
+    @Override
     public void setup(Context context) {
       Configuration conf = context.getConfiguration();
       interval = conf.getInt("db.fetch.interval.default", 2592000);
@@ -302,6 +304,7 @@
      * 
      * For more details @see NUTCH-1405
      */
+    @Override
     public void reduce(Text key, Iterable<CrawlDatum> values, Context context)
         throws IOException, InterruptedException {
 
@@ -369,12 +372,10 @@
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     long start = System.currentTimeMillis();
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Injector: starting at " + sdf.format(start));
-      LOG.info("Injector: crawlDb: " + crawlDb);
-      LOG.info("Injector: urlDir: " + urlDir);
-      LOG.info("Injector: Converting injected urls to crawl db entries.");
-    }
+    LOG.info("Injector: starting at {}", sdf.format(start));
+    LOG.info("Injector: crawlDb: {}", crawlDb);
+    LOG.info("Injector: urlDir: {}", urlDir);
+    LOG.info("Injector: Converting injected urls to crawl db entries.");
 
     // set configuration
     Configuration conf = getConf();
@@ -535,6 +536,7 @@
     System.exit(res);
   }
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
       usage();
@@ -578,6 +580,7 @@
   /**
    * Used by the Nutch REST service
    */
+  @Override
   public Map<String, Object> run(Map<String, Object> args, String crawlId)
       throws Exception {
     if(args.size()<1){
diff --git a/src/java/org/apache/nutch/crawl/LinkDb.java b/src/java/org/apache/nutch/crawl/LinkDb.java
index b32e64f..e53411f 100644
--- a/src/java/org/apache/nutch/crawl/LinkDb.java
+++ b/src/java/org/apache/nutch/crawl/LinkDb.java
@@ -84,6 +84,7 @@
     private URLFilters urlFilters;
     private URLNormalizers urlNormalizers;
 
+    @Override
     public void setup(Mapper<Text, ParseData, Text, Inlinks>.Context context) {
       Configuration conf = context.getConfiguration();
       maxAnchorLength = conf.getInt("linkdb.max.anchor.length", 100);
@@ -98,7 +99,8 @@
       }
     } 
 
-    public void map(Text key, ParseData parseData,
+    @Override
+   public void map(Text key, ParseData parseData,
             Context context)
                     throws IOException, InterruptedException {
       String fromUrl = key.toString();
@@ -196,17 +198,15 @@
 
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     long start = System.currentTimeMillis();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("LinkDb: starting at {}", sdf.format(start));
-      LOG.info("LinkDb: linkdb: {}", linkDb);
-      LOG.info("LinkDb: URL normalize: {}", normalize);
-      LOG.info("LinkDb: URL filter: {}", filter);
-      if (conf.getBoolean(IGNORE_INTERNAL_LINKS, true)) {
-        LOG.info("LinkDb: internal links will be ignored.");
-      }
-      if (conf.getBoolean(IGNORE_EXTERNAL_LINKS, false)) {
-        LOG.info("LinkDb: external links will be ignored.");
-      }
+    LOG.info("LinkDb: starting at {}", sdf.format(start));
+    LOG.info("LinkDb: linkdb: {}", linkDb);
+    LOG.info("LinkDb: URL normalize: {}", normalize);
+    LOG.info("LinkDb: URL filter: {}", filter);
+    if (conf.getBoolean(IGNORE_INTERNAL_LINKS, true)) {
+      LOG.info("LinkDb: internal links will be ignored.");
+    }
+    if (conf.getBoolean(IGNORE_EXTERNAL_LINKS, false)) {
+      LOG.info("LinkDb: external links will be ignored.");
     }
     if (conf.getBoolean(IGNORE_INTERNAL_LINKS, true)
             && conf.getBoolean(IGNORE_EXTERNAL_LINKS, false)) {
@@ -217,9 +217,7 @@
     }
 
     for (int i = 0; i < segments.length; i++) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("LinkDb: adding segment: {}", segments[i]);
-      }
+      LOG.info("LinkDb: adding segment: {}", segments[i]);
       FileInputFormat.addInputPath(job, new Path(segments[i],
               ParseData.DIR_NAME));
     }
@@ -240,9 +238,8 @@
     }
 
     if (fs.exists(currentLinkDb)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("LinkDb: merging with existing linkdb: {}", linkDb);
-      }
+      LOG.info("LinkDb: merging with existing linkdb: {}", linkDb);
+
       // try to merge
       Path newLinkDb = FileOutputFormat.getOutputPath(job);
       job = LinkDbMerger.createMergeJob(getConf(), linkDb, normalize, filter);
@@ -333,6 +330,7 @@
     System.exit(res);
   }
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
       System.err
diff --git a/src/java/org/apache/nutch/crawl/LinkDbFilter.java b/src/java/org/apache/nutch/crawl/LinkDbFilter.java
index 33895f9..ed9151a 100644
--- a/src/java/org/apache/nutch/crawl/LinkDbFilter.java
+++ b/src/java/org/apache/nutch/crawl/LinkDbFilter.java
@@ -56,6 +56,7 @@
 
   private Text newKey = new Text();
 
+  @Override
   public void setup(Mapper<Text, Inlinks, Text, Inlinks>.Context context) {
     Configuration conf = context.getConfiguration();
     filter = conf.getBoolean(URL_FILTERING, false);
@@ -69,9 +70,7 @@
     }
   }
 
-  public void close() {
-  }
-
+  @Override
   public void map(Text key, Inlinks value, Context context)
       throws IOException, InterruptedException {
     String url = key.toString();
diff --git a/src/java/org/apache/nutch/crawl/LinkDbMerger.java b/src/java/org/apache/nutch/crawl/LinkDbMerger.java
index d5942be..059dbcd 100644
--- a/src/java/org/apache/nutch/crawl/LinkDbMerger.java
+++ b/src/java/org/apache/nutch/crawl/LinkDbMerger.java
@@ -82,11 +82,13 @@
 
     private int maxInlinks;
 
+    @Override
     public void setup(Reducer<Text, Inlinks, Text, Inlinks>.Context context) {
       Configuration conf = context.getConfiguration();
       maxInlinks = conf.getInt("linkdb.max.inlinks", 10000);
     }
 
+    @Override
     public void reduce(Text key, Iterable<Inlinks> values, Context context)
         throws IOException, InterruptedException {
 
@@ -108,9 +110,6 @@
     }
   }
 
-  public void close() throws IOException {
-  }
-
   public void merge(Path output, Path[] dbs, boolean normalize, boolean filter)
       throws Exception {
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@@ -183,6 +182,7 @@
     System.exit(res);
   }
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
       System.err
diff --git a/src/java/org/apache/nutch/crawl/LinkDbReader.java b/src/java/org/apache/nutch/crawl/LinkDbReader.java
index 6ea3c26..5d422b4 100644
--- a/src/java/org/apache/nutch/crawl/LinkDbReader.java
+++ b/src/java/org/apache/nutch/crawl/LinkDbReader.java
@@ -115,6 +115,7 @@
         new Inlinks());
   }
 
+  @Override
   public void close() throws IOException {
     if (readers != null) {
       for (int i = 0; i < readers.length; i++) {
@@ -154,10 +155,10 @@
     throws IOException, InterruptedException, ClassNotFoundException {
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     long start = System.currentTimeMillis();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("LinkDb dump: starting at " + sdf.format(start));
-      LOG.info("LinkDb dump: db: " + linkdb);
-    }
+
+    LOG.info("LinkDb dump: starting at {}", sdf.format(start));
+    LOG.info("LinkDb dump: db: {}", linkdb);
+
     Path outFolder = new Path(output);
 
     Job job = NutchJob.getInstance(getConf());
@@ -220,6 +221,7 @@
     System.exit(res);
   }
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
       System.err
diff --git a/src/java/org/apache/nutch/crawl/SignatureFactory.java b/src/java/org/apache/nutch/crawl/SignatureFactory.java
index e017cf4..e605ec5 100644
--- a/src/java/org/apache/nutch/crawl/SignatureFactory.java
+++ b/src/java/org/apache/nutch/crawl/SignatureFactory.java
@@ -46,9 +46,7 @@
     Signature impl = (Signature) objectCache.getObject(clazz);
     if (impl == null) {
       try {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("Using Signature impl: " + clazz);
-        }
+        LOG.info("Using Signature impl: {}", clazz);
         Class<?> implClass = Class.forName(clazz);
         impl = (Signature) implClass.getConstructor().newInstance();
         impl.setConf(conf);
diff --git a/src/java/org/apache/nutch/crawl/URLPartitioner.java b/src/java/org/apache/nutch/crawl/URLPartitioner.java
index 80b4fab..d9e6c4c 100644
--- a/src/java/org/apache/nutch/crawl/URLPartitioner.java
+++ b/src/java/org/apache/nutch/crawl/URLPartitioner.java
@@ -71,9 +71,6 @@
     return conf;
   }
 
-  public void close() {
-  }
-
   /** Hash by host or domain name or IP address. */
   public int getPartition(Text key, Writable value, int numReduceTasks) {
     String urlString = key.toString();
diff --git a/src/java/org/apache/nutch/fetcher/Fetcher.java b/src/java/org/apache/nutch/fetcher/Fetcher.java
index e5250ae..687411e 100644
--- a/src/java/org/apache/nutch/fetcher/Fetcher.java
+++ b/src/java/org/apache/nutch/fetcher/Fetcher.java
@@ -199,14 +199,10 @@
       QueueFeeder feeder; 
 
       int threadCount = conf.getInt("fetcher.threads.fetch", 10);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Fetcher: threads: {}", threadCount);
-      }
+      LOG.info("Fetcher: threads: {}", threadCount);
 
       int timeoutDivisor = conf.getInt("fetcher.threads.timeout.divisor", 2);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Fetcher: time-out divisor: {}", timeoutDivisor);
-      }
+      LOG.info("Fetcher: time-out divisor: {}", timeoutDivisor);
 
       int queueDepthMuliplier = conf.getInt(
           "fetcher.queue.depth.multiplier", 50);
@@ -240,17 +236,15 @@
 
       int throughputThresholdNumRetries = 0;
 
-      int throughputThresholdPages = conf.getInt(
-          "fetcher.throughput.threshold.pages", -1);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Fetcher: throughput threshold: {}", throughputThresholdPages);
-      }
-      int throughputThresholdMaxRetries = conf.getInt(
-          "fetcher.throughput.threshold.retries", 5);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Fetcher: throughput threshold retries: {}",
-            throughputThresholdMaxRetries);
-      }
+      int throughputThresholdPages = conf
+          .getInt("fetcher.throughput.threshold.pages", -1);
+      LOG.info("Fetcher: throughput threshold: {}", throughputThresholdPages);
+
+      int throughputThresholdMaxRetries = conf
+          .getInt("fetcher.throughput.threshold.retries", 5);
+      LOG.info("Fetcher: throughput threshold retries: {}",
+          throughputThresholdMaxRetries);
+
       long throughputThresholdTimeLimit = conf.getLong(
           "fetcher.throughput.threshold.check.after", -1);
 
@@ -304,8 +298,9 @@
           // Check if we're dropping below the threshold
           if (pagesLastSec < throughputThresholdPages) {
             throughputThresholdNumRetries++;
-            LOG.warn("{}: dropping below configured threshold of {} pages per second",
-                Integer.toString(throughputThresholdNumRetries), Integer.toString(throughputThresholdPages));
+            LOG.warn(
+                "{}: dropping below configured threshold of {} pages per second",
+                throughputThresholdNumRetries, throughputThresholdPages);
 
             // Quit if we dropped below threshold too many times
             if (throughputThresholdNumRetries == throughputThresholdMaxRetries) {
@@ -457,8 +452,7 @@
 
     int maxOutlinkDepth = getConf().getInt("fetcher.follow.outlinks.depth", -1);
     if (maxOutlinkDepth > 0) {
-      LOG.info("Fetcher: following outlinks up to depth: {}",
-          Integer.toString(maxOutlinkDepth));
+      LOG.info("Fetcher: following outlinks up to depth: {}", maxOutlinkDepth);
 
       int maxOutlinkDepthNumLinks = getConf().getInt(
           "fetcher.follow.outlinks.num.links", 4);
@@ -472,7 +466,7 @@
       }
 
       LOG.info("Fetcher: maximum outlinks to follow: {}",
-          Integer.toString(totalOutlinksToFollow));
+          totalOutlinksToFollow);
     }
 
     Job job = NutchJob.getInstance(getConf());
@@ -521,6 +515,7 @@
     System.exit(res);
   }
 
+  @Override
   public int run(String[] args) throws Exception {
 
     String usage = "Usage: Fetcher <segment> [-threads n]";
diff --git a/src/java/org/apache/nutch/fetcher/FetcherThread.java b/src/java/org/apache/nutch/fetcher/FetcherThread.java
index 5d5a20b..bc0d639 100644
--- a/src/java/org/apache/nutch/fetcher/FetcherThread.java
+++ b/src/java/org/apache/nutch/fetcher/FetcherThread.java
@@ -85,6 +85,7 @@
   private URLNormalizers normalizers;
   private ProtocolFactory protocolFactory;
   private long maxCrawlDelay;
+  private long minCrawlDelay;
   private String queueMode;
   private int maxRedirect;
   private boolean maxRedirectExceededSkip = false;
@@ -165,6 +166,9 @@
     this.protocolFactory = new ProtocolFactory(conf);
     this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
     this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
+    float crawlDelay = conf.getFloat("fetcher.server.delay", 1.0f);
+    this.minCrawlDelay = (long) (conf.getFloat("fetcher.min.crawl.delay",
+        crawlDelay) * 1000);
     this.activeThreads = activeThreads;
     this.fetchQueues = fetchQueues;
     this.feeder = feeder;
@@ -299,9 +303,7 @@
                   Thread.currentThread().getId(), fit.url,
                   fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay);
             }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("redirectCount={}", redirectCount);
-            }
+            LOG.debug("redirectCount={}", redirectCount);
             redirecting = false;
             Protocol protocol = this.protocolFactory.getProtocol(fit.u);
             BaseRobotRules rules = protocol.getRobotRules(fit.url, fit.datum,
@@ -324,8 +326,8 @@
               if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 0) {
                 // unblock
                 fetchQueues.finishFetchItem(fit, true);
-                LOG.info("Crawl-Delay for {} too long ({}), skipping", fit.url,
-                    rules.getCrawlDelay());
+                LOG.info("Crawl-Delay for {} too long ({} ms), skipping",
+                    fit.url, rules.getCrawlDelay());
                 output(fit.url, fit.datum, null,
                     ProtocolStatus.STATUS_ROBOTS_DENIED,
                     CrawlDatum.STATUS_FETCH_GONE);
@@ -334,12 +336,17 @@
                 continue;
               } else {
                 FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
-                fiq.crawlDelay = rules.getCrawlDelay();
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Crawl delay for queue: " + fit.queueID
-                      + " is set to " + fiq.crawlDelay
-                      + " as per robots.txt. url: " + fit.url);
+                long crawlDelay = rules.getCrawlDelay();
+                if (crawlDelay < minCrawlDelay) {
+                  LOG.info(
+                      "Crawl-Delay for {} too short ({} ms), adjusting to {} ms",
+                      fit.url, rules.getCrawlDelay(), minCrawlDelay);
+                  crawlDelay = minCrawlDelay;
                 }
+                fiq.crawlDelay = crawlDelay;
+                LOG.debug(
+                    "Crawl delay for queue: {} is set to {} as per robots.txt. url: ",
+                    fit.queueID, fiq.crawlDelay, fit.url);
               }
             }
             ProtocolOutput output = protocol.getProtocolOutput(fit.url,
diff --git a/src/java/org/apache/nutch/hostdb/ReadHostDb.java b/src/java/org/apache/nutch/hostdb/ReadHostDb.java
index daf013f..62bf3a7 100644
--- a/src/java/org/apache/nutch/hostdb/ReadHostDb.java
+++ b/src/java/org/apache/nutch/hostdb/ReadHostDb.java
@@ -69,6 +69,7 @@
     protected Text emptyText = new Text();
     protected Expression expr = null;
 
+    @Override
     public void setup(Context context) {
       dumpHomepages = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOMEPAGES, false);
       dumpHostnames = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOSTNAMES, false);
@@ -87,6 +88,7 @@
       }
     }
 
+    @Override
     public void map(Text key, HostDatum datum, Context context) throws IOException, InterruptedException {
       if (fieldHeader && !dumpHomepages && !dumpHostnames) {
         context.write(new Text("hostname"), new Text("unfetched\tfetched\tgone\tredirTemp\tredirPerm\tnotModified\tnumRecords\tdnsFail\tcnxFail\tsumFail\tscore\tlastCheck\thomepage\tmetadata"));
@@ -242,6 +244,7 @@
     System.exit(res);
   }
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
       System.err.println("Usage: ReadHostDb <hostdb> [-get <url>] [<output> [-dumpHomepages | -dumpHostnames | -expr <expr.>]]");
diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDb.java b/src/java/org/apache/nutch/hostdb/UpdateHostDb.java
index 7066f7e..60c9fa6 100644
--- a/src/java/org/apache/nutch/hostdb/UpdateHostDb.java
+++ b/src/java/org/apache/nutch/hostdb/UpdateHostDb.java
@@ -160,6 +160,7 @@
     System.exit(res);
   }
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
       System.err.println("Usage: UpdateHostDb -hostdb <hostdb> " +
diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java b/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
index c239349..9657621 100644
--- a/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
+++ b/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
@@ -195,7 +195,7 @@
       // Filtered out?
       if (buffer == null) {
         context.getCounter("UpdateHostDb", "filtered_records").increment(1);
-        LOG.info("UpdateHostDb: " + key.toString() + " hostdatum has been filtered");
+        LOG.info("UpdateHostDb: {} hostdatum has been filtered", keyStr);
         return;
       }
 
@@ -219,7 +219,7 @@
       // Filtered out?
       if (buffer == null) {
         context.getCounter("UpdateHostDb", "filtered_records").increment(1);
-        LOG.info("UpdateHostDb: " + key.toString() + " score has been filtered");
+        LOG.info("UpdateHostDb: {} score has been filtered", keyStr);
         return;
       }
 
diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
index 862a3c9..f473848 100644
--- a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
+++ b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
@@ -349,7 +349,7 @@
       return;
     } else {
       context.getCounter("UpdateHostDb", "skipped_not_eligible").increment(1);
-      LOG.info("UpdateHostDb: " + key.toString() + ": skipped_not_eligible");
+      LOG.info("UpdateHostDb: {}: skipped_not_eligible", key);
     }
 
     // Write the host datum if it wasn't written by the resolver thread
@@ -415,7 +415,8 @@
       try {
         // Wait for the executor to shut down completely
         if (!executor.isTerminated()) {
-          LOG.info("UpdateHostDb: resolver threads waiting: " + Integer.toString(executor.getPoolSize()));
+          LOG.info("UpdateHostDb: resolver threads waiting: {}",
+              executor.getPoolSize());
           Thread.sleep(1000);
         } else {
           // All is well, get out
diff --git a/src/java/org/apache/nutch/indexer/CleaningJob.java b/src/java/org/apache/nutch/indexer/CleaningJob.java
index 9b49653..ca1198e 100644
--- a/src/java/org/apache/nutch/indexer/CleaningJob.java
+++ b/src/java/org/apache/nutch/indexer/CleaningJob.java
@@ -180,6 +180,7 @@
         + TimingUtil.elapsedTime(start, end));
   }
 
+  @Override
   public int run(String[] args) throws IOException {
     if (args.length < 1) {
       String usage = "Usage: CleaningJob <crawldb> [-noCommit]";
diff --git a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
index cfb6dea..3e9bc15 100644
--- a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
+++ b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
@@ -238,6 +238,7 @@
       }
     }
 
+    @Override
     public void reduce(Text key, Iterable<NutchWritable> values,
         Context context) throws IOException, InterruptedException {
       Inlinks inlinks = null;
@@ -288,8 +289,8 @@
           parseText = (ParseText) value;
         } else if (value instanceof Content) {
           content = (Content)value;
-        } else if (LOG.isWarnEnabled()) {
-          LOG.warn("Unrecognized type: " + value.getClass());
+        } else {
+          LOG.warn("Unrecognized type: {}", value.getClass());
         }
       }
 
@@ -353,9 +354,7 @@
             inlinks, boost);
       } catch (final ScoringFilterException e) {
         context.getCounter("IndexerStatus", "errors (ScoringFilter)").increment(1);
-        if (LOG.isWarnEnabled()) {
-          LOG.warn("Error calculating score {}: {}", key, e);
-        }
+        LOG.warn("Error calculating score {}: {}", key, e);
         return;
       }
       // apply boost to all indexed fields.
@@ -389,7 +388,7 @@
         doc = filters.filter(doc, parse, key, fetchDatum, inlinks);
       } catch (final IndexingException e) {
         if (LOG.isWarnEnabled()) {
-          LOG.warn("Error indexing " + key + ": " + e);
+          LOG.warn("Error indexing " + key + ": ", e);
         }
         context.getCounter("IndexerStatus", "errors (IndexingFilter)").increment(1);
         return;
@@ -430,9 +429,6 @@
     }
   }
 
-  public void close() throws IOException {
-  }
-
   public static void initMRJob(Path crawlDb, Path linkDb,
       Collection<Path> segments, Job job, boolean addBinaryContent) throws IOException{
 
diff --git a/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java b/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
index a43ccb1..4f849a0 100644
--- a/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
+++ b/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
@@ -65,6 +65,7 @@
   private static final Logger LOG = LoggerFactory
       .getLogger(MethodHandles.lookup().lookupClass());
 
+  @Override
   public int run(String[] args) throws Exception {
     String url = null;
 
diff --git a/src/java/org/apache/nutch/indexer/IndexingJob.java b/src/java/org/apache/nutch/indexer/IndexingJob.java
index e476adc..0966276 100644
--- a/src/java/org/apache/nutch/indexer/IndexingJob.java
+++ b/src/java/org/apache/nutch/indexer/IndexingJob.java
@@ -211,6 +211,7 @@
     System.err.println("");
   }
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length == 0) {
       usage();
diff --git a/src/java/org/apache/nutch/net/URLFilterChecker.java b/src/java/org/apache/nutch/net/URLFilterChecker.java
index c1d1093..4e613d0 100644
--- a/src/java/org/apache/nutch/net/URLFilterChecker.java
+++ b/src/java/org/apache/nutch/net/URLFilterChecker.java
@@ -30,6 +30,7 @@
 
   private URLFilters filters = null;
 
+  @Override
   public int run(String[] args) throws Exception {
     usage = "Usage: URLFilterChecker [-Dproperty=value]... [-filterName filterName] (-stdin | -listen <port> [-keepClientCnxOpen]) \n"
         + "\n  -filterName\tURL filter plugin name (eg. urlfilter-regex) to check,"
diff --git a/src/java/org/apache/nutch/net/URLNormalizerChecker.java b/src/java/org/apache/nutch/net/URLNormalizerChecker.java
index ee25f2f..fa0baa2 100644
--- a/src/java/org/apache/nutch/net/URLNormalizerChecker.java
+++ b/src/java/org/apache/nutch/net/URLNormalizerChecker.java
@@ -31,6 +31,7 @@
   private String scope = URLNormalizers.SCOPE_DEFAULT;
   URLNormalizers normalizers;
 
+  @Override
   public int run(String[] args) throws Exception {
     usage = "Usage: URLNormalizerChecker [-Dproperty=value]... [-normalizer <normalizerName>] [-scope <scope>] (-stdin | -listen <port> [-keepClientCnxOpen])\n"
         + "\n  -normalizer\tURL normalizer plugin (eg. urlnormalizer-basic) to check,"
diff --git a/src/java/org/apache/nutch/parse/ParseOutputFormat.java b/src/java/org/apache/nutch/parse/ParseOutputFormat.java
index 4bc0853..fcaa1d1 100644
--- a/src/java/org/apache/nutch/parse/ParseOutputFormat.java
+++ b/src/java/org/apache/nutch/parse/ParseOutputFormat.java
@@ -97,6 +97,7 @@
     }
   }
 
+  @Override
   public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
       throws IOException {
     Path path = FileOutputFormat.getOutputPath(context);
@@ -108,9 +109,6 @@
     Configuration conf = context.getConfiguration();
     Path out = FileOutputFormat.getOutputPath(context);
     FileSystem fs = out.getFileSystem(context.getConfiguration());
-    if ((out == null) && (context.getNumReduceTasks() != 0)) {
-      throw new IOException("Output directory not set in JobContext.");
-    }
     if (fs == null) {
       fs = out.getFileSystem(conf);
     }
@@ -132,6 +130,7 @@
     return result.toString();
   }
 
+  @Override
   public RecordWriter<Text, Parse> getRecordWriter(TaskAttemptContext context)
       throws IOException {
     Configuration conf = context.getConfiguration();
@@ -216,6 +215,7 @@
 
     return new RecordWriter<Text, Parse>() {
 
+      @Override
       public void write(Text key, Parse parse) throws IOException {
 
         String fromUrl = key.toString();
@@ -374,6 +374,7 @@
         }
       }
 
+      @Override
       public void close(TaskAttemptContext context) throws IOException {
         if (textOut != null)
           textOut.close();
diff --git a/src/java/org/apache/nutch/parse/ParseSegment.java b/src/java/org/apache/nutch/parse/ParseSegment.java
index 9a92ced..62551b2 100644
--- a/src/java/org/apache/nutch/parse/ParseSegment.java
+++ b/src/java/org/apache/nutch/parse/ParseSegment.java
@@ -90,10 +90,6 @@
     }
 
     @Override
-    public void cleanup(Context context){
-    }
-
-    @Override
     public void map(WritableComparable<?> key, Content content,
         Context context)
         throws IOException, InterruptedException {
@@ -156,13 +152,11 @@
         try {
           scfilters.passScoreAfterParsing(url, content, parse);
         } catch (ScoringFilterException e) {
-          if (LOG.isWarnEnabled()) {
-            LOG.warn("Error passing score: " + url + ": " + e.getMessage());
-          }
+          LOG.warn("Error passing score: {}: {}", url, e.getMessage());
         }
 
         long end = System.currentTimeMillis();
-        LOG.info("Parsed (" + Long.toString(end - start) + "ms):" + url);
+        LOG.info("Parsed ({}ms): {}", (end - start), url);
 
         context.write(
             url,
@@ -282,6 +276,7 @@
     System.exit(res);
   }
 
+  @Override
   public int run(String[] args) throws Exception {
     Path segment;
 
@@ -312,6 +307,7 @@
   /*
    * Used for Nutch REST service
    */
+  @Override
   public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception {
 
     Map<String, Object> results = new HashMap<>();
diff --git a/src/java/org/apache/nutch/parse/ParserChecker.java b/src/java/org/apache/nutch/parse/ParserChecker.java
index e880485..2a976ba 100644
--- a/src/java/org/apache/nutch/parse/ParserChecker.java
+++ b/src/java/org/apache/nutch/parse/ParserChecker.java
@@ -77,6 +77,7 @@
       .getLogger(MethodHandles.lookup().lookupClass());
   private ScoringFilters scfilters;
 
+  @Override
   public int run(String[] args) throws Exception {
     String url = null;
 
@@ -256,9 +257,9 @@
         content, parseResult.get(new Text(url)));
 
     if (LOG.isInfoEnabled()) {
-      LOG.info("parsing: " + url);
-      LOG.info("contentType: " + contentType);
-      LOG.info("signature: " + StringUtil.toHexString(signature));
+      LOG.info("parsing: {}", url);
+      LOG.info("contentType: {}", contentType);
+      LOG.info("signature: {}", StringUtil.toHexString(signature));
     }
 
     for (Map.Entry<Text, Parse> entry : parseResult) {
diff --git a/src/java/org/apache/nutch/protocol/Content.java b/src/java/org/apache/nutch/protocol/Content.java
index c513159..e7016f0 100644
--- a/src/java/org/apache/nutch/protocol/Content.java
+++ b/src/java/org/apache/nutch/protocol/Content.java
@@ -21,6 +21,8 @@
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.zip.InflaterInputStream;
 
@@ -256,6 +258,20 @@
   }
 
   public String toString() {
+    return toString(StandardCharsets.UTF_8);
+  }
+
+  public String toString(String charset) {
+    Charset c = StandardCharsets.UTF_8;
+    try {
+      c = Charset.forName(charset);
+    } catch(Exception e) {
+      // fall-back to utf-8
+    };
+    return toString(c);
+  }
+
+  public String toString(Charset charset) {
     StringBuffer buffer = new StringBuffer();
 
     buffer.append("Version: " + version + "\n");
@@ -264,7 +280,7 @@
     buffer.append("contentType: " + contentType + "\n");
     buffer.append("metadata: " + metadata + "\n");
     buffer.append("Content:\n");
-    buffer.append(new String(content)); // try default encoding
+    buffer.append(new String(content, charset));
 
     return buffer.toString();
 
diff --git a/src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java b/src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java
index 71fe42f..3c74b38 100644
--- a/src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java
+++ b/src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java
@@ -407,6 +407,7 @@
    * Runs the LinkDumper tool. This simply creates the database, to read the
    * values the nested Reader tool must be used.
    */
+  @Override
   public int run(String[] args) throws Exception {
 
     Options options = new Options();
diff --git a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
index b6bfa98..9720754 100644
--- a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
+++ b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
@@ -362,9 +362,7 @@
      */
     public static class CountMapper extends
         Mapper<Text, Node, Text, LongWritable> {
-      public void setup(Mapper<Text, Node, Text, LongWritable>.Context context) {
-      }
-
+      @Override
       public void map(Text key, Node value,
           Context context)
           throws IOException, InterruptedException {
@@ -377,9 +375,7 @@
      */
     public static class CountReducer extends
         Reducer<Text, LongWritable, Text, LongWritable> {
-      public void setup(Reducer<Text, LongWritable, Text, LongWritable>.Context context) {
-      }
-
+      @Override
       public void reduce(Text key, Iterable<LongWritable> values,
           Context context)
           throws IOException, InterruptedException {
@@ -642,9 +638,6 @@
     super(conf);
   }
 
-  public void close() {
-  }
-
   /**
    * Runs the complete link analysis job. The complete job determins rank one
    * score. Then runs through a given number of invert and analyze iterations,
@@ -736,6 +729,7 @@
   /**
    * Runs the LinkRank tool.
    */
+  @Override
   public int run(String[] args) throws Exception {
 
     Options options = new Options();
diff --git a/src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java b/src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java
index cc93eb8..70c4270 100644
--- a/src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java
+++ b/src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java
@@ -361,6 +361,7 @@
   /**
    * Runs the node dumper tool.
    */
+  @Override
   public int run(String[] args) throws Exception {
 
     Options options = new Options();
diff --git a/src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java b/src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java
index 93a7c95..3674fa8 100644
--- a/src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java
+++ b/src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java
@@ -226,6 +226,7 @@
   /**
    * Runs the ScoreUpdater tool.
    */
+  @Override
   public int run(String[] args) throws Exception {
 
     Options options = new Options();
diff --git a/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java b/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java
index 72a6173..0b53a39 100644
--- a/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java
+++ b/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java
@@ -330,6 +330,7 @@
       /**
        * Configures the OutlinkDb job reducer. Sets up internal links and link limiting.
        */
+      @Override
       public void setup(Reducer<Text, NutchWritable, Text, LinkDatum>.Context context) {
         Configuration config = context.getConfiguration();
         conf = config;
@@ -340,6 +341,7 @@
         
       }
    
+      @Override
       public void reduce(Text key, Iterable<NutchWritable> values,
           Context context)
           throws IOException, InterruptedException {
@@ -406,9 +408,6 @@
         }
       }
     }
-
-    public void close() {
-    }
   }
 
   /**
@@ -431,10 +430,12 @@
        * Configures job mapper. Sets timestamp for all Inlink LinkDatum objects to the
        * current system time.
        */
+      @Override
       public void setup(Mapper<Text, LinkDatum, Text, LinkDatum>.Context context) {
         timestamp = System.currentTimeMillis();
       }
 
+      @Override
       public void map(Text key, LinkDatum datum,
           Context context)
           throws IOException, InterruptedException {
@@ -465,12 +466,7 @@
     public static class NodeDbReducer extends 
         Reducer<Text, LinkDatum, Text, Node> {
 
-      /**
-       * Configures job reducer.
-       */
-      public void setup(Reducer<Text, LinkDatum, Text, Node>.Context context) {
-      }
-
+      @Override
       public void reduce(Text key, Iterable<LinkDatum> values,
           Context context) throws IOException, InterruptedException {
 
@@ -732,6 +728,7 @@
   /**
    * Parses command link arguments and runs the WebGraph jobs.
    */
+  @Override
   public int run(String[] args) throws Exception {
 
     // boolean options
diff --git a/src/java/org/apache/nutch/segment/SegmentMerger.java b/src/java/org/apache/nutch/segment/SegmentMerger.java
index 9fac5b2..7dbfd11 100644
--- a/src/java/org/apache/nutch/segment/SegmentMerger.java
+++ b/src/java/org/apache/nutch/segment/SegmentMerger.java
@@ -363,9 +363,6 @@
     super.setConf(conf);
   }
 
-  public void close() throws IOException {
-  }
-
 
   public static class SegmentMergerMapper extends
   Mapper<Text, MetaWrapper, Text, MetaWrapper> {
@@ -431,7 +428,7 @@
         mergeFilters = new SegmentMergeFilters(conf);
       }      
       sliceSize = conf.getLong("segment.merger.slice", -1);
-      if ((sliceSize > 0) && (LOG.isInfoEnabled())) {
+      if (sliceSize > 0) {
         LOG.info("Slice size: {} URLs.", sliceSize);
       }
       if (sliceSize > 0) {
@@ -625,9 +622,7 @@
   public void merge(Path out, Path[] segs, boolean filter, boolean normalize,
           long slice) throws IOException, ClassNotFoundException, InterruptedException {
     String segmentName = Generator.generateSegmentName();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Merging {} segments to {}/{}", segs.length, out, segmentName);
-    }
+    LOG.info("Merging {} segments to {}/{}", segs.length, out, segmentName);
     Job job = NutchJob.getInstance(getConf());
     Configuration conf = job.getConfiguration();
     job.setJobName("mergesegs " + out + "/" + segmentName);
@@ -659,9 +654,7 @@
         segs[i] = null;
         continue;
       }
-      if (LOG.isInfoEnabled()) {
-        LOG.info("SegmentMerger:   adding {}", segs[i]);
-      }
+      LOG.info("SegmentMerger:   adding {}", segs[i]);
       Path cDir = new Path(segs[i], Content.DIR_NAME);
       Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);
       Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME);
@@ -682,20 +675,20 @@
 
       pg = g; pf = f; pp = p; pc = c; ppd = pd; ppt = pt;
     }
-    StringBuilder sb = new StringBuilder();
-    if (c)
-      sb.append(" " + Content.DIR_NAME);
-    if (g)
-      sb.append(" " + CrawlDatum.GENERATE_DIR_NAME);
-    if (f)
-      sb.append(" " + CrawlDatum.FETCH_DIR_NAME);
-    if (p)
-      sb.append(" " + CrawlDatum.PARSE_DIR_NAME);
-    if (pd)
-      sb.append(" " + ParseData.DIR_NAME);
-    if (pt)
-      sb.append(" " + ParseText.DIR_NAME);
     if (LOG.isInfoEnabled()) {
+      StringBuilder sb = new StringBuilder();
+      if (c)
+        sb.append(" " + Content.DIR_NAME);
+      if (g)
+        sb.append(" " + CrawlDatum.GENERATE_DIR_NAME);
+      if (f)
+        sb.append(" " + CrawlDatum.FETCH_DIR_NAME);
+      if (p)
+        sb.append(" " + CrawlDatum.PARSE_DIR_NAME);
+      if (pd)
+        sb.append(" " + ParseData.DIR_NAME);
+      if (pt)
+        sb.append(" " + ParseText.DIR_NAME);
       LOG.info("SegmentMerger: using segment data from: {}", sb.toString());
     }
     for (int i = 0; i < segs.length; i++) {
@@ -755,6 +748,7 @@
   /**
    * @param args
    */
+  @Override
   public int run(String[] args)  throws Exception {
     if (args.length < 2) {
       System.err
diff --git a/src/java/org/apache/nutch/segment/SegmentReader.java b/src/java/org/apache/nutch/segment/SegmentReader.java
index bcf99b8..f47a76d 100644
--- a/src/java/org/apache/nutch/segment/SegmentReader.java
+++ b/src/java/org/apache/nutch/segment/SegmentReader.java
@@ -25,6 +25,7 @@
 import java.io.PrintWriter;
 import java.io.Writer;
 import java.lang.invoke.MethodHandles;
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -61,11 +62,13 @@
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.metadata.Metadata;
 import org.apache.nutch.parse.ParseData;
 import org.apache.nutch.parse.ParseText;
 import org.apache.nutch.protocol.Content;
 import org.apache.nutch.util.HadoopFSUtil;
 import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.SegmentReaderUtil;
 
 /** Dump the content of a segment. */
@@ -74,12 +77,13 @@
   private static final Logger LOG = LoggerFactory
       .getLogger(MethodHandles.lookup().lookupClass());
 
-  private boolean co;
-  private boolean fe;
-  private boolean ge;
-  private boolean pa;
-  private boolean pd;
-  private boolean pt;
+  private boolean co = true;
+  private boolean fe = true;
+  private boolean ge = true;
+  private boolean pa = true;
+  private boolean pd = true;
+  private boolean pt = true;
+  private boolean recodeContent = false;
 
   public static class InputCompatMapper extends
       Mapper<WritableComparable<?>, Writable, Text, NutchWritable> {
@@ -103,6 +107,8 @@
   /** Implements a text output format */
   public static class TextOutputFormat extends
       FileOutputFormat<WritableComparable<?>, Writable> {
+
+    @Override
     public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(
         TaskAttemptContext context) throws IOException, InterruptedException {
       String name = getUniqueFile(context, "part", "");
@@ -119,11 +125,14 @@
       final PrintStream printStream = new PrintStream(
           fs.create(segmentDumpFile), false, StandardCharsets.UTF_8.name());
       return new RecordWriter<WritableComparable<?>, Writable>() {
+
+        @Override
         public synchronized void write(WritableComparable<?> key, Writable value)
             throws IOException {
           printStream.println(value);
         }
 
+        @Override
         public synchronized void close(TaskAttemptContext context) throws IOException {
           printStream.close();
         }
@@ -131,38 +140,17 @@
     }
   }
 
-  public SegmentReader() {
-    super(null);
-  }
-
-  public SegmentReader(Configuration conf, boolean co, boolean fe, boolean ge,
-      boolean pa, boolean pd, boolean pt) {
-    super(conf);
-    this.co = co;
-    this.fe = fe;
-    this.ge = ge;
-    this.pa = pa;
-    this.pd = pd;
-    this.pt = pt;
-  }
-
-  public void setup(Job job) {
-      Configuration conf = job.getConfiguration();
-      this.co = conf.getBoolean("segment.reader.co", true);
-      this.fe = conf.getBoolean("segment.reader.fe", true);
-      this.ge = conf.getBoolean("segment.reader.ge", true);
-      this.pa = conf.getBoolean("segment.reader.pa", true);
-      this.pd = conf.getBoolean("segment.reader.pd", true);
-      this.pt = conf.getBoolean("segment.reader.pt", true);
-    }
-
-  public void close() {
-  }
-
   public static class InputCompatReducer extends
       Reducer<Text, NutchWritable, Text, Text> {
 
     private long recNo = 0L;
+    private boolean recodeContent = false;
+
+    @Override
+    public void setup(Context context) {
+      recodeContent = context.getConfiguration()
+          .getBoolean("segment.reader.content.recode", false);
+    }
 
     @Override
     public void reduce(Text key, Iterable<NutchWritable> values,
@@ -171,20 +159,32 @@
 
       dump.append("\nRecno:: ").append(recNo++).append("\n");
       dump.append("URL:: " + key.toString() + "\n");
+      Content content = null;
+      Charset charset = null;
       for (NutchWritable val : values) {
         Writable value = val.get(); // unwrap
         if (value instanceof CrawlDatum) {
           dump.append("\nCrawlDatum::\n").append(((CrawlDatum) value).toString());
         } else if (value instanceof Content) {
-          dump.append("\nContent::\n").append(((Content) value).toString());
+          if (recodeContent) {
+            content = (Content) value;
+          } else {
+            dump.append("\nContent::\n").append(((Content) value).toString());
+          }
         } else if (value instanceof ParseData) {
           dump.append("\nParseData::\n").append(((ParseData) value).toString());
+          if (recodeContent) {
+            charset = getCharset(((ParseData) value).getParseMeta());
+          }
         } else if (value instanceof ParseText) {
           dump.append("\nParseText::\n").append(((ParseText) value).toString());
         } else if (LOG.isWarnEnabled()) {
           LOG.warn("Unrecognized type: " + value.getClass());
         }
       }
+      if (recodeContent && content != null) {
+        dump.append("\nContent::\n").append(content.toString(charset));
+      }
       context.write(key, new Text(dump.toString()));
     }
   }
@@ -192,11 +192,9 @@
   public void dump(Path segment, Path output) throws IOException,
       InterruptedException, ClassNotFoundException {
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("SegmentReader: dump segment: " + segment);
-    }
+    LOG.info("SegmentReader: dump segment: {}", segment);
 
-    Job job = Job.getInstance();
+    Job job = NutchJob.getInstance(getConf());
     job.setJobName("read " + segment);
     Configuration conf = job.getConfiguration();
 
@@ -277,9 +275,7 @@
       }
     }
     fs.delete(tempDir, true);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("SegmentReader: done");
-    }
+    LOG.info("SegmentReader: done");
   }
 
   /** Appends two files and updates the Recno counter */
@@ -306,7 +302,7 @@
 
   public void get(final Path segment, final Text key, Writer writer,
       final Map<String, List<Writable>> results) throws Exception {
-    LOG.info("SegmentReader: get '" + key + "'");
+    LOG.info("SegmentReader: get '{}'", key);
     ArrayList<Thread> threads = new ArrayList<>();
     if (co)
       threads.add(new Thread() {
@@ -405,7 +401,13 @@
       if (res != null && res.size() > 0) {
         for (int k = 0; k < res.size(); k++) {
           writer.write(keys[i][1]);
-          writer.write(res.get(k) + "\n");
+          if (recodeContent && keys[i][0].equals("co")) {
+            Charset charset = getCharset(((ParseData) results.get("pd").get(k)).getParseMeta());
+            writer.write(((Content) res.get(k)).toString(charset));
+          } else {
+            writer.write(res.get(k).toString());
+          }
+          writer.write('\n');
         }
       }
       writer.flush();
@@ -459,6 +461,22 @@
     return res;
   }
 
+  /** Try to get HTML encoding from parse metadata */
+  public static Charset getCharset(Metadata parseMeta) {
+    Charset cs = StandardCharsets.UTF_8;
+    String charset = parseMeta.get(Metadata.CHAR_ENCODING_FOR_CONVERSION);
+    if (charset == null) {
+      // fall-back: "Content-Encoding" (set by parse-tika)
+      charset = parseMeta.get(Metadata.CONTENT_ENCODING);
+    }
+    try {
+      cs = Charset.forName(charset);
+    } catch (Exception e) {
+      // fall-back to utf-8
+    }
+    return cs;
+  }
+
   public static class SegmentReaderStats {
     public long start = -1L;
     public long end = -1L;
@@ -579,6 +597,7 @@
 
   private static final int MODE_GET = 2;
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
       usage();
@@ -592,12 +611,6 @@
     else if (args[0].equals("-get"))
       mode = MODE_GET;
 
-    boolean co = true;
-    boolean fe = true;
-    boolean ge = true;
-    boolean pa = true;
-    boolean pd = true;
-    boolean pt = true;
     // collect general options
     for (int i = 1; i < args.length; i++) {
       if (args[i].equals("-nocontent")) {
@@ -618,22 +631,21 @@
       } else if (args[i].equals("-noparsetext")) {
         pt = false;
         args[i] = null;
+      } else if (args[i].equals("-recode")) {
+        recodeContent = true;
+        args[i] = null;
       }
     }
-    Configuration conf = NutchConfiguration.create();
-    SegmentReader segmentReader = new SegmentReader(conf, co, fe, ge, pa, pd,
-        pt);
+
+    if (recodeContent) {
+      LOG.info("Recoding charset of HTML content");
+      getConf().setBoolean("segment.reader.content.recode", true);
+    }
+
     // collect required args
     switch (mode) {
     case MODE_DUMP:
 
-      this.co = co;
-      this.fe = fe;
-      this.ge = ge;
-      this.pa = pa;
-      this.pd = pd;
-      this.pt = pt;
-
       String input = args[1];
       if (input == null) {
         System.err.println("Missing required argument: <segment_dir>");
@@ -655,7 +667,7 @@
           continue;
         if (args[i].equals("-dir")) {
           Path dir = new Path(args[++i]);
-          FileSystem fs = dir.getFileSystem(conf);
+          FileSystem fs = dir.getFileSystem(getConf());
           FileStatus[] fstats = fs.listStatus(dir,
               HadoopFSUtil.getPassDirectoriesFilter(fs));
           Path[] files = HadoopFSUtil.getPaths(fstats);
@@ -665,7 +677,7 @@
         } else
           dirs.add(new Path(args[i]));
       }
-      segmentReader.list(dirs, new OutputStreamWriter(System.out, StandardCharsets.UTF_8));
+      list(dirs, new OutputStreamWriter(System.out, StandardCharsets.UTF_8));
       return 0;
     case MODE_GET:
       input = args[1];
@@ -680,8 +692,9 @@
         usage();
         return -1;
       }
-      segmentReader.get(new Path(input), new Text(key), new OutputStreamWriter(
-          System.out, StandardCharsets.UTF_8), new HashMap<>());
+      get(new Path(input), new Text(key),
+          new OutputStreamWriter(System.out, StandardCharsets.UTF_8),
+          new HashMap<>());
       return 0;
     default:
       System.err.println("Invalid operation: " + args[0]);
@@ -700,6 +713,8 @@
     System.err.println("\t-noparse\tignore crawl_parse directory");
     System.err.println("\t-noparsedata\tignore parse_data directory");
     System.err.println("\t-noparsetext\tignore parse_text directory");
+    System.err.println("\t-recode \ttry to recode HTML content from the page's\n"
+        + "\t        \toriginal charset to UTF-8\n");
     System.err.println();
     System.err
         .println("* SegmentReader -dump <segment_dir> <output> [general options]");
diff --git a/src/java/org/apache/nutch/tools/Benchmark.java b/src/java/org/apache/nutch/tools/Benchmark.java
index 203496b..d7c3b74 100644
--- a/src/java/org/apache/nutch/tools/Benchmark.java
+++ b/src/java/org/apache/nutch/tools/Benchmark.java
@@ -131,6 +131,7 @@
     }
   }
 
+  @Override
   public int run(String[] args) throws Exception {
     String plugins = "protocol-http|parse-tika|scoring-opic|urlfilter-regex|urlnormalizer-pass";
     int seeds = 1;
diff --git a/src/java/org/apache/nutch/tools/FreeGenerator.java b/src/java/org/apache/nutch/tools/FreeGenerator.java
index 4bec975..8c537d9 100644
--- a/src/java/org/apache/nutch/tools/FreeGenerator.java
+++ b/src/java/org/apache/nutch/tools/FreeGenerator.java
@@ -106,14 +106,12 @@
             scfilters.injectedScore(url, datum);
           }
         } catch (Exception e) {
-          LOG.warn("Error adding url '" + value.toString() + "', skipping: "
-              + StringUtils.stringifyException(e));
+          LOG.warn("Error adding url '{}', skipping: {}", value,
+              StringUtils.stringifyException(e));
           return;
         }
         if (urlString == null) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("- skipping " + value.toString());
-          }
+          LOG.debug("- skipping {}", value);
           return;
         }
         entry.datum = datum;
@@ -145,6 +143,7 @@
     }
   }
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
       System.err
diff --git a/src/java/org/apache/nutch/tools/warc/WARCExporter.java b/src/java/org/apache/nutch/tools/warc/WARCExporter.java
index d307000..0f06e49 100644
--- a/src/java/org/apache/nutch/tools/warc/WARCExporter.java
+++ b/src/java/org/apache/nutch/tools/warc/WARCExporter.java
@@ -90,14 +90,9 @@
 
   public static class WARCMapReduce {
 
-    public void close() throws IOException {
-    }
-
     public static class WARCMapper extends 
         Mapper<Text, Writable, Text, NutchWritable> {
-      public void setup(Mapper<Text, Writable, Text, NutchWritable>.Context context) {
-      }
-
+      @Override
       public void map(Text key, Writable value, Context context)
               throws IOException, InterruptedException {
         context.write(key, new NutchWritable(value));
@@ -106,9 +101,7 @@
 
     public static class WARCReducer extends
         Reducer<Text, NutchWritable, NullWritable, WARCWritable> {
-      public void setup(Reducer<Text, NutchWritable, NullWritable, WARCWritable>.Context context) {
-      }
-
+      @Override
       public void reduce(Text key, Iterable<NutchWritable> values,
           Context context) throws IOException, InterruptedException {
 
@@ -305,6 +298,7 @@
     return 0;
   }
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
       System.err.println(
diff --git a/src/java/org/apache/nutch/util/CrawlCompletionStats.java b/src/java/org/apache/nutch/util/CrawlCompletionStats.java
index c138e61..f3e808b 100644
--- a/src/java/org/apache/nutch/util/CrawlCompletionStats.java
+++ b/src/java/org/apache/nutch/util/CrawlCompletionStats.java
@@ -67,6 +67,7 @@
   private static final int MODE_HOST = 1;
   private static final int MODE_DOMAIN = 2;
 
+  @Override
   public int run(String[] args) throws Exception {
     Option helpOpt = new Option("h", "help", false, "Show this message");
     @SuppressWarnings("static-access")
@@ -196,10 +197,12 @@
       Mapper<Text, CrawlDatum, Text, LongWritable> {
     int mode = 0;
 
+    @Override
     public void setup(Context context) {
       mode = context.getConfiguration().getInt("domain.statistics.mode", MODE_DOMAIN);
     }
 
+    @Override
     public void map(Text urlText, CrawlDatum datum, Context context)
         throws IOException, InterruptedException {
 
@@ -225,6 +228,7 @@
 
   static class CrawlCompletionStatsReducer extends
       Reducer<Text, LongWritable, LongWritable, Text> {
+    @Override
     public void reduce(Text key, Iterable<LongWritable> values, Context context)
         throws IOException, InterruptedException {
       long total = 0;
@@ -239,6 +243,7 @@
 
   public static class CrawlCompletionStatsCombiner extends
       Reducer<Text, LongWritable, Text, LongWritable> {
+    @Override
     public void reduce(Text key, Iterable<LongWritable> values, Context context)
         throws IOException, InterruptedException {
       long total = 0;
diff --git a/src/java/org/apache/nutch/util/ProtocolStatusStatistics.java b/src/java/org/apache/nutch/util/ProtocolStatusStatistics.java
index 3b6cc48..f52a9c5 100644
--- a/src/java/org/apache/nutch/util/ProtocolStatusStatistics.java
+++ b/src/java/org/apache/nutch/util/ProtocolStatusStatistics.java
@@ -64,6 +64,7 @@
 
   private static final Text UNFETCHED_TEXT = new Text("UNFETCHED");
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
       System.err.println("Usage: ProtocolStatistics inputDirs outDir [numOfReducer]");
@@ -140,7 +141,7 @@
 
   static class ProtocolStatusStatisticsMapper extends
       Mapper<Text, CrawlDatum, Text, LongWritable> {
-
+    @Override
     public void map(Text urlText, CrawlDatum datum, Context context)
         throws IOException, InterruptedException {
       if (datum.getMetaData().containsKey(Nutch.PROTOCOL_STATUS_CODE_KEY)) {
@@ -153,6 +154,7 @@
 
   static class ProtocolStatusStatisticsReducer extends
       Reducer<Text, LongWritable, LongWritable, Text> {
+    @Override
     public void reduce(Text key, Iterable<LongWritable> values, Context context)
         throws IOException, InterruptedException {
       long total = 0;
@@ -167,6 +169,7 @@
 
   public static class ProtocolStatusStatisticsCombiner extends
       Reducer<Text, LongWritable, Text, LongWritable> {
+    @Override
     public void reduce(Text key, Iterable<LongWritable> values, Context context)
         throws IOException, InterruptedException {
       long total = 0;
diff --git a/src/java/org/apache/nutch/util/SitemapProcessor.java b/src/java/org/apache/nutch/util/SitemapProcessor.java
index f558c46..c686d6a 100644
--- a/src/java/org/apache/nutch/util/SitemapProcessor.java
+++ b/src/java/org/apache/nutch/util/SitemapProcessor.java
@@ -106,6 +106,7 @@
     private CrawlDatum datum = new CrawlDatum();
     private SiteMapParser parser = null;
 
+    @Override
     public void setup(Context context) {
       Configuration conf = context.getConfiguration();
       int maxSize = conf.getInt(SITEMAP_SIZE_MAX, SiteMapParser.MAX_BYTES_ALLOWED);
@@ -127,6 +128,7 @@
       }
     }
 
+    @Override
     public void map(Text key, Writable value, Context context) throws IOException, InterruptedException {
       String url;
 
@@ -308,11 +310,13 @@
 
     private boolean overwriteExisting = false; // DO NOT ENABLE!!
 
+    @Override
     public void setup(Context context) {
       Configuration conf = context.getConfiguration();
       this.overwriteExisting = conf.getBoolean(SITEMAP_OVERWRITE_EXISTING, false);
     }
 
+    @Override
     public void reduce(Text key, Iterable<CrawlDatum> values, Context context)
         throws IOException, InterruptedException {
       sitemapDatum  = null;
@@ -353,9 +357,7 @@
   public void sitemap(Path crawldb, Path hostdb, Path sitemapUrlDir, boolean strict, boolean filter,
                       boolean normalize, int threads) throws Exception {
     long start = System.currentTimeMillis();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("SitemapProcessor: Starting at {}", sdf.format(start));
-    }
+    LOG.info("SitemapProcessor: Starting at {}", sdf.format(start));
 
     FileSystem fs = crawldb.getFileSystem(getConf());
     Path old = new Path(crawldb, "old");
@@ -461,6 +463,7 @@
     System.err.println("\t-noNormalize\t\tturn off URLNormalizer on urls (optional)");
   }
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length < 3) {
       usage();
diff --git a/src/java/org/apache/nutch/util/domain/DomainStatistics.java b/src/java/org/apache/nutch/util/domain/DomainStatistics.java
index 4354ffc..fd2f940 100644
--- a/src/java/org/apache/nutch/util/domain/DomainStatistics.java
+++ b/src/java/org/apache/nutch/util/domain/DomainStatistics.java
@@ -62,6 +62,7 @@
   private static final int MODE_SUFFIX = 3;
   private static final int MODE_TLD = 4;
 
+  @Override
   public int run(String[] args) throws Exception {
     if (args.length < 3) {
       System.err.println("Usage: DomainStatistics inputDirs outDir mode [numOfReducer]");
@@ -161,11 +162,13 @@
       Mapper<Text, CrawlDatum, Text, LongWritable> {
     int mode = 0;
 
+    @Override
     public void setup(Context context) {
       mode = context.getConfiguration().getInt("domain.statistics.mode",
           MODE_DOMAIN);
     }
 
+    @Override
     public void map(Text urlText, CrawlDatum datum, Context context)
         throws IOException, InterruptedException {
 
@@ -209,6 +212,7 @@
 
   static class DomainStatisticsReducer extends
       Reducer<Text, LongWritable, LongWritable, Text> {
+    @Override
     public void reduce(Text key, Iterable<LongWritable> values, Context context)
         throws IOException, InterruptedException {
       long total = 0;
@@ -223,6 +227,7 @@
 
   public static class DomainStatisticsCombiner extends
       Reducer<Text, LongWritable, Text, LongWritable> {
+    @Override
     public void reduce(Text key, Iterable<LongWritable> values, Context context)
         throws IOException, InterruptedException {
       long total = 0;
diff --git a/src/plugin/build.xml b/src/plugin/build.xml
index 581a37a..a2a0dd7 100755
--- a/src/plugin/build.xml
+++ b/src/plugin/build.xml
@@ -53,7 +53,6 @@
     <ant dir="indexer-csv" target="deploy"/>
     <ant dir="indexer-dummy" target="deploy"/>
     <ant dir="indexer-elastic" target="deploy"/>
-    <ant dir="indexer-elastic-rest" target="deploy"/>
     <ant dir="indexer-kafka" target="deploy"/>
     <ant dir="indexer-rabbit" target="deploy"/>
     <ant dir="indexer-solr" target="deploy"/>
@@ -193,7 +192,6 @@
     <ant dir="indexer-csv" target="clean"/>
     <ant dir="indexer-dummy" target="clean"/>
     <ant dir="indexer-elastic" target="clean"/>
-    <ant dir="indexer-elastic-rest" target="clean"/>
     <ant dir="indexer-kafka" target="clean"/>
     <ant dir="indexer-rabbit" target="clean"/>
     <ant dir="indexer-solr" target="clean"/>
diff --git a/src/plugin/indexer-dummy/src/java/org/apache/nutch/indexwriter/dummy/DummyIndexWriter.java b/src/plugin/indexer-dummy/src/java/org/apache/nutch/indexwriter/dummy/DummyIndexWriter.java
index d809f66..1dfc653 100644
--- a/src/plugin/indexer-dummy/src/java/org/apache/nutch/indexwriter/dummy/DummyIndexWriter.java
+++ b/src/plugin/indexer-dummy/src/java/org/apache/nutch/indexwriter/dummy/DummyIndexWriter.java
@@ -49,6 +49,7 @@
   private boolean delete = false;
   private String path;
 
+  @Override
   public void open(Configuration conf, String name) throws IOException {
     //Implementation not required
   }
@@ -102,6 +103,7 @@
     writer.write("add\t" + doc.getFieldValue("id") + "\n");
   }
 
+  @Override
   public void close() throws IOException {
     LOG.debug("Closing dummy index file");
     writer.flush();
diff --git a/src/plugin/indexer-elastic-rest/README.md b/src/plugin/indexer-elastic-rest/README.md
deleted file mode 100644
index e5a76c9..0000000
--- a/src/plugin/indexer-elastic-rest/README.md
+++ /dev/null
@@ -1,45 +0,0 @@
-indexer-elastic-rest plugin for Nutch 
-=====================================
-
-**indexer-elastic-rest plugin** is used for sending documents from one or more segments to Elasticsearch, but using Jest to connect with the REST API provided by Elasticsearch. The configuration for the index writers is on **conf/index-writers.xml** file, included in the official Nutch distribution and it's as follow:
-
-```xml
-<writer id="<writer_id>" class="org.apache.nutch.indexwriter.elasticrest.ElasticRestIndexWriter">
-  <mapping>
-    ...
-  </mapping>
-  <parameters>
-    ...
-  </parameters>   
-</writer>
-```
-
-Each `<writer>` element has two mandatory attributes:
-
-* `<writer_id>` is a unique identification for each configuration. This feature allows Nutch to distinguish each configuration, even when they are for the same index writer. In addition, it allows to have multiple instances for the same index writer, but with different configurations.
-
-* `org.apache.nutch.indexwriter.elasticrest.ElasticRestIndexWriter` corresponds to the canonical name of the class that implements the IndexWriter extension point. This value should not be modified for the **indexer-elastic-rest plugin**.
-
-## Mapping
-
-The mapping section is explained [here](https://wiki.apache.org/nutch/IndexWriters#Mapping_section). The structure of this section is general for all index writers.
-
-## Parameters
-
-Each parameter has the form `<param name="<name>" value="<value>"/>` and the parameters for this index writer are:
-
-Parameter Name | Description | Default value
---|--|--
-host | The hostname or a list of comma separated hostnames to send documents to using Elasticsearch Jest. Both host and port must be defined. |  
-port | The port to connect to using Elasticsearch Jest. | 9200
-index | Default index to send documents to. | nutch
-max.bulk.docs | Maximum size of the bulk in number of documents. | 250
-max.bulk.size | Maximum size of the bulk in bytes. | 2500500
-user | Username for auth credentials (only used when https is enabled) | user
-password | Password for auth credentials (only used when https is enabled) | password
-type | Default type to send documents to. | doc
-https | **true** to enable https, **false** to disable https. If you've disabled http access (by forcing https), be sure to set this to true, otherwise you might get "connection reset by peer". | false
-trustallhostnames | **true** to trust elasticsearch server's certificate even if its listed domain name does not match the domain they are hosted or **false** to check if the elasticsearch server's certificate's listed domain is the same domain that it is hosted on, and if it doesn't, then fail to index (only used when https is enabled) | false
-languages | A list of strings denoting the supported languages (e.g. `en, de, fr, it`). If this value is empty all documents will be sent to index property. If not empty the Rest client will distribute documents in different indices based on their `languages` property. Indices are named with the following schema: `index separator language` (e.g. `nutch_de`). Entries with an unsupported `languages` value will be added to index `index separator sink` (e.g. `nutch_others`). | 
-separator | Is used only if `languages` property is defined to build the index name (i.e. `index separator lang`). | _
-sink | Is used only if `languages` property is defined to build the index name where to store documents with unsupported languages (i.e. `index separator sink`). | others 
\ No newline at end of file
diff --git a/src/plugin/indexer-elastic-rest/build-ivy.xml b/src/plugin/indexer-elastic-rest/build-ivy.xml
deleted file mode 100644
index 9f8f5ff..0000000
--- a/src/plugin/indexer-elastic-rest/build-ivy.xml
+++ /dev/null
@@ -1,54 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project name="indexer-elastic-rest" default="deps-jar" xmlns:ivy="antlib:org.apache.ivy.ant">
-
-    <property name="ivy.install.version" value="2.1.0"/>
-    <condition property="ivy.home" value="${env.IVY_HOME}">
-        <isset property="env.IVY_HOME"/>
-    </condition>
-    <property name="ivy.home" value="${user.home}/.ant"/>
-    <property name="ivy.checksums" value=""/>
-    <property name="ivy.jar.dir" value="${ivy.home}/lib"/>
-    <property name="ivy.jar.file" value="${ivy.jar.dir}/ivy.jar"/>
-
-    <target name="download-ivy" unless="offline">
-
-        <mkdir dir="${ivy.jar.dir}"/>
-        <!-- download Ivy from web site so that it can be used even without any special installation -->
-        <get src="https://repo1.maven.org/maven2/org/apache/ivy/ivy/${ivy.install.version}/ivy-${ivy.install.version}.jar"
-             dest="${ivy.jar.file}" usetimestamp="true"/>
-    </target>
-
-    <target name="init-ivy" depends="download-ivy">
-        <!-- try to load ivy here from ivy home, in case the user has not already dropped
-                it into ant's lib dir (note that the latter copy will always take precedence).
-                We will not fail as long as local lib dir exists (it may be empty) and
-                ivy is in at least one of ant's lib dir or the local lib dir. -->
-        <path id="ivy.lib.path">
-            <fileset dir="${ivy.jar.dir}" includes="*.jar"/>
-
-        </path>
-        <taskdef resource="org/apache/ivy/ant/antlib.xml"
-                 uri="antlib:org.apache.ivy.ant" classpathref="ivy.lib.path"/>
-    </target>
-
-    <target name="deps-jar" depends="init-ivy">
-        <ivy:retrieve pattern="lib/[artifact]-[revision].[ext]"/>
-    </target>
-
-</project>
diff --git a/src/plugin/indexer-elastic-rest/build.xml b/src/plugin/indexer-elastic-rest/build.xml
deleted file mode 100644
index 95d2cd6..0000000
--- a/src/plugin/indexer-elastic-rest/build.xml
+++ /dev/null
@@ -1,22 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project name="indexer-elastic-rest" default="jar-core">
-
-    <import file="../build-plugin.xml"/>
-
-</project>
diff --git a/src/plugin/indexer-elastic-rest/howto_upgrade_es.txt b/src/plugin/indexer-elastic-rest/howto_upgrade_es.txt
deleted file mode 100644
index 4f4fbd4..0000000
--- a/src/plugin/indexer-elastic-rest/howto_upgrade_es.txt
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-1. Upgrade elasticsearch dependency in src/plugin/indexer-elastic-rest/ivy.xml
-
-2. Upgrade the Elasticsearch specific dependencies in src/plugin/indexer-elastic-rest/plugin.xml
-   To get the list of dependencies and their versions execute:
-   $ ant -f ./build-ivy.xml
-   $ ls lib/
diff --git a/src/plugin/indexer-elastic-rest/ivy.xml b/src/plugin/indexer-elastic-rest/ivy.xml
deleted file mode 100644
index 48d576a..0000000
--- a/src/plugin/indexer-elastic-rest/ivy.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0" ?>
-
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-
-<ivy-module version="1.0">
-    <info organisation="org.apache.nutch" module="${ant.project.name}">
-        <license name="Apache 2.0"/>
-        <ivyauthor name="Apache Nutch Team" url="https://nutch.apache.org/"/>
-        <description>
-            Apache Nutch
-        </description>
-    </info>
-
-    <configurations>
-        <include file="../../..//ivy/ivy-configurations.xml"/>
-    </configurations>
-
-    <publications>
-        <!--get the artifact from our module name-->
-        <artifact conf="master"/>
-    </publications>
-
-    <dependencies>
-        <!-- https://mvnrepository.com/artifact/io.searchbox/jest -->
-        <dependency org="io.searchbox" name="jest" rev="2.0.3" conf="*->default"/>
-    </dependencies>
-
-</ivy-module>
diff --git a/src/plugin/indexer-elastic-rest/plugin.xml b/src/plugin/indexer-elastic-rest/plugin.xml
deleted file mode 100644
index d31714e..0000000
--- a/src/plugin/indexer-elastic-rest/plugin.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<plugin id="indexer-elastic-rest" name="ElasticRestIndexWriter" version="1.0.0"
-        provider-name="nutch.apache.org">
-
-    <runtime>
-        <library name="indexer-elastic-rest.jar">
-            <export name="*"/>
-        </library>
-
-        <library name="commons-codec-1.9.jar"/>
-        <library name="commons-lang3-3.4.jar"/>
-        <library name="commons-logging-1.2.jar"/>
-        <library name="gson-2.6.2.jar"/>
-        <library name="guava-19.0.jar"/>
-        <library name="httpasyncclient-4.1.1.jar"/>
-        <library name="httpclient-4.5.2.jar"/>
-        <library name="httpcore-4.4.4.jar"/>
-        <library name="httpcore-nio-4.4.4.jar"/>
-        <library name="jest-2.0.3.jar"/>
-        <library name="jest-common-2.0.3.jar"/>
-
-    </runtime>
-
-    <requires>
-        <import plugin="nutch-extensionpoints"/>
-    </requires>
-
-    <extension id="org.apache.nutch.indexer.elasticrest"
-               name="Elasticsearch Rest Index Writer"
-               point="org.apache.nutch.indexer.IndexWriter">
-        <implementation id="ElasticRestIndexWriter"
-                        class="org.apache.nutch.indexwriter.elasticrest.ElasticRestIndexWriter"/>
-    </extension>
-
-</plugin>
diff --git a/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/ElasticRestConstants.java b/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/ElasticRestConstants.java
deleted file mode 100644
index cbbc297..0000000
--- a/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/ElasticRestConstants.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nutch.indexwriter.elasticrest;
-
-public interface ElasticRestConstants {
-  public static final String HOST = "host";
-  public static final String PORT = "port";
-  public static final String INDEX = "index";
-  public static final String MAX_BULK_DOCS = "max.bulk.docs";
-  public static final String MAX_BULK_LENGTH = "max.bulk.size";
-
-  public static final String USER = "user";
-  public static final String PASSWORD = "password";
-  public static final String TYPE = "type";
-  public static final String HTTPS = "https";
-  public static final String HOSTNAME_TRUST = "trustallhostnames";
-  
-  public static final String LANGUAGES = "languages";
-  public static final String SEPARATOR = "separator";
-  public static final String SINK = "sink";
-}
diff --git a/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/ElasticRestIndexWriter.java b/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/ElasticRestIndexWriter.java
deleted file mode 100644
index 0ddf539..0000000
--- a/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/ElasticRestIndexWriter.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nutch.indexwriter.elasticrest;
-
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestClientFactory;
-import io.searchbox.client.JestResult;
-import io.searchbox.client.JestResultHandler;
-import io.searchbox.client.config.HttpClientConfig;
-import io.searchbox.core.Bulk;
-import io.searchbox.core.BulkResult;
-import io.searchbox.core.Delete;
-import io.searchbox.core.Index;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.http.concurrent.BasicFuture;
-import org.apache.http.conn.ssl.DefaultHostnameVerifier;
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
-import org.apache.http.nio.conn.SchemeIOSessionStrategy;
-import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
-import org.apache.http.ssl.SSLContextBuilder;
-import org.apache.http.ssl.TrustStrategy;
-import org.apache.nutch.indexer.IndexWriter;
-import org.apache.nutch.indexer.IndexWriterParams;
-import org.apache.nutch.indexer.NutchDocument;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.SSLContext;
-import java.io.IOException;
-import java.net.URL;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-import java.util.AbstractMap;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
-/**
- */
-public class ElasticRestIndexWriter implements IndexWriter {
-  public static Logger LOG = LoggerFactory
-      .getLogger(ElasticRestIndexWriter.class);
-
-  private static final int DEFAULT_MAX_BULK_DOCS = 250;
-  private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
-  private static final String DEFAULT_SEPARATOR = "_";
-  private static final String DEFAULT_SINK = "others";
-
-  private JestClient client;
-  private String defaultIndex;
-  private String defaultType = null;
-
-  private Configuration config;
-
-  private Bulk.Builder bulkBuilder;
-  private int port = -1;
-  private String host = null;
-  private Boolean https = null;
-  private String user = null;
-  private String password = null;
-  private Boolean trustAllHostnames = null;
-
-  private int maxBulkDocs;
-  private int maxBulkLength;
-  private long indexedDocs = 0;
-  private int bulkDocs = 0;
-  private int bulkLength = 0;
-  private boolean createNewBulk = false;
-  private long millis;
-  private BasicFuture<JestResult> basicFuture = null;
-
-  private String[] languages = null;
-  private String separator = null;
-  private String sink = null;
-
-  @Override
-  public void open(Configuration conf, String name) throws IOException {
-    //Implementation not required
-  }
-
-  @Override
-  public void open(IndexWriterParams parameters) throws IOException {
-    host = parameters.get(ElasticRestConstants.HOST);
-    if (StringUtils.isBlank(host)) {
-      String message = "Missing host. It should be set in index-writers.xml";
-      message += "\n" + describe();
-      LOG.error(message);
-      throw new RuntimeException(message);
-    }
-
-    port = parameters.getInt(ElasticRestConstants.PORT, 9200);
-    user = parameters.get(ElasticRestConstants.USER);
-    password = parameters.get(ElasticRestConstants.PASSWORD);
-    https = parameters.getBoolean(ElasticRestConstants.HTTPS, false);
-    trustAllHostnames = parameters
-        .getBoolean(ElasticRestConstants.HOSTNAME_TRUST, false);
-
-    languages = parameters.getStrings(ElasticRestConstants.LANGUAGES);
-    separator = parameters
-        .get(ElasticRestConstants.SEPARATOR, DEFAULT_SEPARATOR);
-    sink = parameters.get(ElasticRestConstants.SINK, DEFAULT_SINK);
-
-    // trust ALL certificates
-    SSLContext sslContext = null;
-    try {
-      sslContext = new SSLContextBuilder()
-          .loadTrustMaterial(new TrustStrategy() {
-            public boolean isTrusted(X509Certificate[] arg0, String arg1)
-                throws CertificateException {
-              return true;
-            }
-          }).build();
-    } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
-      LOG.error("Failed to instantiate sslcontext object: \n{}",
-          ExceptionUtils.getStackTrace(e));
-      throw new SecurityException();
-    }
-
-    // skip hostname checks
-    HostnameVerifier hostnameVerifier = null;
-    if (trustAllHostnames) {
-      hostnameVerifier = NoopHostnameVerifier.INSTANCE;
-    } else {
-      hostnameVerifier = new DefaultHostnameVerifier();
-    }
-
-    SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(
-        sslContext);
-    SchemeIOSessionStrategy httpsIOSessionStrategy = new SSLIOSessionStrategy(
-        sslContext, hostnameVerifier);
-
-    JestClientFactory jestClientFactory = new JestClientFactory();
-    URL urlOfElasticsearchNode = new URL(https ? "https" : "http", host, port,
-        "");
-
-    if (host != null && port > 1) {
-      HttpClientConfig.Builder builder = new HttpClientConfig.Builder(
-          urlOfElasticsearchNode.toString()).multiThreaded(true)
-          .connTimeout(300000).readTimeout(300000);
-      if (https) {
-        if (user != null && password != null) {
-          builder.defaultCredentials(user, password);
-        }
-        builder.defaultSchemeForDiscoveredNodes("https")
-            .sslSocketFactory(sslSocketFactory) // this only affects sync calls
-            .httpsIOSessionStrategy(
-                httpsIOSessionStrategy); // this only affects async calls
-      }
-      jestClientFactory.setHttpClientConfig(builder.build());
-    } else {
-      throw new IllegalStateException(
-          "No host or port specified. Please set the host and port in nutch-site.xml");
-    }
-
-    client = jestClientFactory.getObject();
-
-    defaultIndex = parameters.get(ElasticRestConstants.INDEX, "nutch");
-    defaultType = parameters.get(ElasticRestConstants.TYPE, "doc");
-
-    maxBulkDocs = parameters
-        .getInt(ElasticRestConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
-    maxBulkLength = parameters
-        .getInt(ElasticRestConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
-
-    bulkBuilder = new Bulk.Builder().defaultIndex(defaultIndex)
-        .defaultType(defaultType);
-  }
-
-  private static Object normalizeValue(Object value) {
-    if (value == null) {
-      return null;
-    }
-
-    if (value instanceof Map || value instanceof Date) {
-      return value;
-    }
-
-    return value.toString();
-  }
-
-  @Override
-  public void write(NutchDocument doc) throws IOException {
-    String id = (String) doc.getFieldValue("id");
-    String type = doc.getDocumentMeta().get("type");
-    if (type == null) {
-      type = defaultType;
-    }
-
-    Map<String, Object> source = new HashMap<String, Object>();
-
-    // Loop through all fields of this doc
-    for (String fieldName : doc.getFieldNames()) {
-      Set<Object> allFieldValues = new LinkedHashSet<>(
-          doc.getField(fieldName).getValues());
-
-      if (allFieldValues.size() > 1) {
-        Object[] normalizedFieldValues = allFieldValues.stream()
-            .map(ElasticRestIndexWriter::normalizeValue).toArray();
-
-        // Loop through the values to keep track of the size of this document
-        for (Object value : normalizedFieldValues) {
-          bulkLength += value.toString().length();
-        }
-
-        source.put(fieldName, normalizedFieldValues);
-      } else if (allFieldValues.size() == 1) {
-        Object normalizedFieldValue = normalizeValue(
-            allFieldValues.iterator().next());
-        source.put(fieldName, normalizedFieldValue);
-        bulkLength += normalizedFieldValue.toString().length();
-      }
-    }
-
-    String index;
-    if (languages != null && languages.length > 0) {
-      String language = (String) doc.getFieldValue("lang");
-      boolean exists = false;
-      for (String lang : languages) {
-        if (lang.equals(language)) {
-          exists = true;
-          break;
-        }
-      }
-      if (exists) {
-        index = getLanguageIndexName(language);
-      } else {
-        index = getSinkIndexName();
-      }
-    } else {
-      index = defaultIndex;
-    }
-    Index indexRequest = new Index.Builder(source).index(index).type(type)
-        .id(id).build();
-
-    // Add this indexing request to a bulk request
-    bulkBuilder.addAction(indexRequest);
-
-    indexedDocs++;
-    bulkDocs++;
-
-    if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
-      LOG.info(
-          "Processing bulk request [docs = {}, length = {}, total docs = {}, last doc in bulk = '{}']",
-          bulkDocs, bulkLength, indexedDocs, id);
-      // Flush the bulk of indexing requests
-      createNewBulk = true;
-      commit();
-    }
-  }
-
-  @Override
-  public void delete(String key) throws IOException {
-    try {
-      if (languages != null && languages.length > 0) {
-        Bulk.Builder bulkBuilder = new Bulk.Builder().defaultType(defaultType);
-        for (String lang : languages) {
-          bulkBuilder.addAction(
-              new Delete.Builder(key).index(getLanguageIndexName(lang))
-                  .type(defaultType).build());
-        }
-        bulkBuilder.addAction(
-            new Delete.Builder(key).index(getSinkIndexName()).type(defaultType)
-                .build());
-        client.execute(bulkBuilder.build());
-      } else {
-        client.execute(
-            new Delete.Builder(key).index(defaultIndex).type(defaultType)
-                .build());
-      }
-    } catch (IOException e) {
-      LOG.error(ExceptionUtils.getStackTrace(e));
-      throw e;
-    }
-  }
-
-  @Override
-  public void update(NutchDocument doc) throws IOException {
-    try {
-      write(doc);
-    } catch (IOException e) {
-      LOG.error(ExceptionUtils.getStackTrace(e));
-      throw e;
-    }
-  }
-
-  @Override
-  public void commit() throws IOException {
-    if (basicFuture != null) {
-      // wait for previous to finish
-      long beforeWait = System.currentTimeMillis();
-      try {
-        JestResult result = basicFuture.get();
-        if (result == null) {
-          throw new RuntimeException();
-        }
-        long msWaited = System.currentTimeMillis() - beforeWait;
-        LOG.info("Previous took in ms {}, including wait {}", millis, msWaited);
-      } catch (InterruptedException | ExecutionException e) {
-        LOG.error("Error waiting for result ", e);
-      }
-      basicFuture = null;
-    }
-    if (bulkBuilder != null) {
-      if (bulkDocs > 0) {
-        // start a flush, note that this is an asynchronous call
-        basicFuture = new BasicFuture<>(null);
-        millis = System.currentTimeMillis();
-        client.executeAsync(bulkBuilder.build(),
-            new JestResultHandler<BulkResult>() {
-              @Override
-              public void completed(BulkResult bulkResult) {
-                basicFuture.completed(bulkResult);
-                millis = System.currentTimeMillis() - millis;
-              }
-
-              @Override
-              public void failed(Exception e) {
-                basicFuture.completed(null);
-                LOG.error("Failed result: ", e);
-              }
-            });
-      }
-      bulkBuilder = null;
-    }
-    if (createNewBulk) {
-      // Prepare a new bulk request
-      bulkBuilder = new Bulk.Builder().defaultIndex(defaultIndex)
-          .defaultType(defaultType);
-      bulkDocs = 0;
-      bulkLength = 0;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    // Flush pending requests
-    LOG.info(
-        "Processing remaining requests [docs = {}, length = {}, total docs = {}]",
-        bulkDocs, bulkLength, indexedDocs);
-    createNewBulk = false;
-    commit();
-
-    // flush one more time to finalize the last bulk
-    LOG.info("Processing to finalize last execute");
-    createNewBulk = false;
-    commit();
-
-    // Close
-    client.shutdownClient();
-  }
-
-  /**
-   * Returns {@link Map} with the specific parameters the IndexWriter instance can take.
-   *
-   * @return The values of each row. It must have the form <KEY,<DESCRIPTION,VALUE>>.
-   */
-  @Override
-  public Map<String, Map.Entry<String, Object>> describe() {
-    Map<String, Map.Entry<String, Object>> properties = new LinkedHashMap<>();
-
-    properties.put(ElasticRestConstants.HOST, new AbstractMap.SimpleEntry<>(
-        "The hostname or a list of comma separated hostnames to send documents "
-            + "to using Elasticsearch Jest. Both host and port must be defined.",
-        this.host));
-    properties.put(ElasticRestConstants.PORT, new AbstractMap.SimpleEntry<>(
-        "The port to connect to using Elasticsearch Jest.", this.port));
-    properties.put(ElasticRestConstants.INDEX,
-        new AbstractMap.SimpleEntry<>("Default index to send documents to.",
-            this.defaultIndex));
-    properties.put(ElasticRestConstants.MAX_BULK_DOCS,
-        new AbstractMap.SimpleEntry<>(
-            "Maximum size of the bulk in number of documents.",
-            this.maxBulkDocs));
-    properties.put(ElasticRestConstants.MAX_BULK_LENGTH,
-        new AbstractMap.SimpleEntry<>("Maximum size of the bulk in bytes.",
-            this.maxBulkLength));
-
-    properties.put(ElasticRestConstants.USER, new AbstractMap.SimpleEntry<>(
-        "Username for auth credentials (only used when https is enabled)",
-        this.user));
-    properties.put(ElasticRestConstants.PASSWORD, new AbstractMap.SimpleEntry<>(
-        "Password for auth credentials (only used when https is enabled)",
-        this.password));
-    properties.put(ElasticRestConstants.TYPE,
-        new AbstractMap.SimpleEntry<>("Default type to send documents to.",
-            this.defaultType));
-    properties.put(ElasticRestConstants.HTTPS, new AbstractMap.SimpleEntry<>(
-        "true to enable https, false to disable https. If you've disabled http "
-            + "access (by forcing https), be sure to set this to true, otherwise "
-            + "you might get \"connection reset by peer\".", this.https));
-    properties.put(ElasticRestConstants.HOSTNAME_TRUST,
-        new AbstractMap.SimpleEntry<>(
-            "true to trust elasticsearch server's certificate even if its listed "
-                + "domain name does not match the domain they are hosted or false "
-                + "to check if the elasticsearch server's certificate's listed "
-                + "domain is the same domain that it is hosted on, and if "
-                + "it doesn't, then fail to index (only used when https is enabled)",
-            this.trustAllHostnames));
-
-    properties.put(ElasticRestConstants.LANGUAGES,
-        new AbstractMap.SimpleEntry<>(
-            "A list of strings denoting the supported languages (e.g. en, de, fr, it). "
-                + "If this value is empty all documents will be sent to index property. "
-                + "If not empty the Rest client will distribute documents in different "
-                + "indices based on their languages property. Indices are named with the "
-                + "following schema: index separator language (e.g. nutch_de). "
-                + "Entries with an unsupported languages value will be added to "
-                + "index index separator sink (e.g. nutch_others).",
-            this.languages == null ? "" : String.join(",", languages)));
-    properties.put(ElasticRestConstants.SEPARATOR,
-        new AbstractMap.SimpleEntry<>(
-            "Is used only if languages property is defined to build the index name "
-                + "(i.e. index separator lang).", this.separator));
-    properties.put(ElasticRestConstants.SINK, new AbstractMap.SimpleEntry<>(
-        "Is used only if languages property is defined to build the index name "
-            + "where to store documents with unsupported languages "
-            + "(i.e. index separator sink).", this.sink));
-
-    return properties;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    config = conf;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return config;
-  }
-
-  private String getLanguageIndexName(String lang) {
-    return getComposedIndexName(defaultIndex, lang);
-  }
-
-  private String getSinkIndexName() {
-    return getComposedIndexName(defaultIndex, sink);
-  }
-
-  private String getComposedIndexName(String prefix, String postfix) {
-    return prefix + separator + postfix;
-  }
-}
diff --git a/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/package-info.java b/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/package-info.java
deleted file mode 100644
index e357cc9..0000000
--- a/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Rest based index writer plugin for <a href="http://www.elasticsearch.org/">Elasticsearch</a>.
- */
-package org.apache.nutch.indexwriter.elasticrest;
-
diff --git a/src/plugin/indexer-elastic/README.md b/src/plugin/indexer-elastic/README.md
index 0ac4f08..bccadf7 100644
--- a/src/plugin/indexer-elastic/README.md
+++ b/src/plugin/indexer-elastic/README.md
@@ -30,12 +30,14 @@
 
 Parameter Name | Description | Default value
 --|--|--
-host | Comma-separated list of hostnames to send documents to using [TransportClient](https://static.javadoc.io/org.elasticsearch/elasticsearch/5.3.0/org/elasticsearch/client/transport/TransportClient.html). Either host and port must be defined or cluster. | 
+host | Comma-separated list of hostnames to send documents to using [TransportClient](https://static.javadoc.io/org.elasticsearch/elasticsearch/5.3.0/org/elasticsearch/client/transport/TransportClient.html). Either host and port must be defined. | 
 port | The port to connect to using [TransportClient](https://static.javadoc.io/org.elasticsearch/elasticsearch/5.3.0/org/elasticsearch/client/transport/TransportClient.html). | 9300
-cluster | The cluster name to discover. Either host and port must be defined or cluster. | 
 index | Default index to send documents to. | nutch
+username | Username for auth credentials | elastic
+password | Password for auth credentials | ""
+auth | Whether to enable HTTP basic authentication with elastic. Use `username` and `password` properties to configure your credentials. | false
 max.bulk.docs | Maximum size of the bulk in number of documents. | 250
 max.bulk.size | Maximum size of the bulk in bytes. | 2500500
 exponential.backoff.millis | Initial delay for the [BulkProcessor](https://static.javadoc.io/org.elasticsearch/elasticsearch/5.3.0/org/elasticsearch/action/bulk/BulkProcessor.html) exponential backoff policy. | 100
 exponential.backoff.retries | Number of times the [BulkProcessor](https://static.javadoc.io/org.elasticsearch/elasticsearch/5.3.0/org/elasticsearch/action/bulk/BulkProcessor.html) exponential backoff policy should retry bulk operations. | 10
-bulk.close.timeout | Number of seconds allowed for the [BulkProcessor](https://static.javadoc.io/org.elasticsearch/elasticsearch/5.3.0/org/elasticsearch/action/bulk/BulkProcessor.html) to complete its last operation. | 600
\ No newline at end of file
+bulk.close.timeout | Number of seconds allowed for the [BulkProcessor](https://static.javadoc.io/org.elasticsearch/elasticsearch/5.3.0/org/elasticsearch/action/bulk/BulkProcessor.html) to complete its last operation. | 600
diff --git a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
index d272841..c0d1a61 100644
--- a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
+++ b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
@@ -19,7 +19,10 @@
 public interface ElasticConstants {
   String HOSTS = "host";
   String PORT = "port";
-  String CLUSTER = "cluster";
+  
+  String USER = "username";
+  String PASSWORD = "password";
+  String USE_AUTH = "auth";
   String INDEX = "index";
   String MAX_BULK_DOCS = "max.bulk.docs";
   String MAX_BULK_LENGTH = "max.bulk.size";
diff --git a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
index 74727a0..d46dd6a 100644
--- a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
+++ b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
@@ -17,6 +17,11 @@
 package org.apache.nutch.indexwriter.elastic;
 
 import java.lang.invoke.MethodHandles;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
 import java.time.format.DateTimeFormatter;
 import java.io.IOException;
 import java.util.AbstractMap;
@@ -25,11 +30,21 @@
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import javax.net.ssl.SSLContext;
+
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.http.Header;
 import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
 import org.apache.http.message.BasicHeader;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.TrustStrategy;
 import org.apache.nutch.indexer.IndexWriter;
 import org.apache.nutch.indexer.IndexWriterParams;
 import org.apache.nutch.indexer.NutchDocument;
@@ -47,6 +62,7 @@
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.client.RequestOptions;
@@ -61,7 +77,6 @@
   private static final Logger LOG = LoggerFactory
       .getLogger(MethodHandles.lookup().lookupClass());
 
-
   private static final int DEFAULT_PORT = 9300;
   private static final int DEFAULT_MAX_BULK_DOCS = 250;
   private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
@@ -69,10 +84,14 @@
   private static final int DEFAULT_EXP_BACKOFF_RETRIES = 10;
   private static final int DEFAULT_BULK_CLOSE_TIMEOUT = 600;
   private static final String DEFAULT_INDEX = "nutch";
+  private static final String DEFAULT_USER = "elastic";
 
-  private String cluster;
   private String[] hosts;
   private int port;
+  private Boolean https = null;
+  private String user = null;
+  private String password = null;
+  private Boolean auth;
 
   private int maxBulkDocs;
   private int maxBulkLength;
@@ -89,18 +108,20 @@
 
   @Override
   public void open(Configuration conf, String name) throws IOException {
-    //Implementation not required
+    // Implementation not required
   }
 
   /**
    * Initializes the internal variables from a given index writer configuration.
    *
-   * @param parameters Params from the index writer configuration.
-   * @throws IOException Some exception thrown by writer.
+   * @param parameters
+   *          Params from the index writer configuration.
+   * @throws IOException
+   *           Some exception thrown by writer.
    */
   @Override
   public void open(IndexWriterParams parameters) throws IOException {
-    cluster = parameters.get(ElasticConstants.CLUSTER);
+
     String hosts = parameters.get(ElasticConstants.HOSTS);
 
     if (StringUtils.isBlank(hosts)) {
@@ -114,57 +135,71 @@
         DEFAULT_BULK_CLOSE_TIMEOUT);
     defaultIndex = parameters.get(ElasticConstants.INDEX, DEFAULT_INDEX);
 
-    maxBulkDocs = parameters
-        .getInt(ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
-    maxBulkLength = parameters
-        .getInt(ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
-    expBackoffMillis = parameters
-        .getInt(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS,
-            DEFAULT_EXP_BACKOFF_MILLIS);
-    expBackoffRetries = parameters
-        .getInt(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES,
-            DEFAULT_EXP_BACKOFF_RETRIES);
+    maxBulkDocs = parameters.getInt(ElasticConstants.MAX_BULK_DOCS,
+        DEFAULT_MAX_BULK_DOCS);
+    maxBulkLength = parameters.getInt(ElasticConstants.MAX_BULK_LENGTH,
+        DEFAULT_MAX_BULK_LENGTH);
+    expBackoffMillis = parameters.getInt(
+        ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS,
+        DEFAULT_EXP_BACKOFF_MILLIS);
+    expBackoffRetries = parameters.getInt(
+        ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES,
+        DEFAULT_EXP_BACKOFF_RETRIES);
 
     client = makeClient(parameters);
 
     LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}",
         maxBulkDocs, maxBulkLength);
-    bulkProcessor = BulkProcessor.builder(
-        (request, bulkListener) ->
-        client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
-        bulkProcessorListener())
+    bulkProcessor = BulkProcessor
+        .builder((request, bulkListener) -> client.bulkAsync(request,
+            RequestOptions.DEFAULT, bulkListener), bulkProcessorListener())
         .setBulkActions(maxBulkDocs)
         .setBulkSize(new ByteSizeValue(maxBulkLength, ByteSizeUnit.BYTES))
-        .setConcurrentRequests(1).setBackoffPolicy(BackoffPolicy
-            .exponentialBackoff(TimeValue.timeValueMillis(expBackoffMillis),
-                expBackoffRetries)).build();
+        .setConcurrentRequests(1)
+        .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
+            TimeValue.timeValueMillis(expBackoffMillis), expBackoffRetries))
+        .build();
   }
 
   /**
    * Generates a RestHighLevelClient with the hosts given
    */
-  protected RestHighLevelClient makeClient(IndexWriterParams parameters) throws IOException {
+  protected RestHighLevelClient makeClient(IndexWriterParams parameters)
+      throws IOException {
     hosts = parameters.getStrings(ElasticConstants.HOSTS);
     port = parameters.getInt(ElasticConstants.PORT, DEFAULT_PORT);
 
+    auth = parameters.getBoolean(ElasticConstants.USE_AUTH, false);
+    user = parameters.get(ElasticConstants.USER, DEFAULT_USER);
+    password = parameters.get(ElasticConstants.PASSWORD, "");
+
+    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+    credentialsProvider.setCredentials(AuthScope.ANY,
+        new UsernamePasswordCredentials(user, password));
+
     RestHighLevelClient client = null;
 
     if (hosts != null && port > 1) {
       HttpHost[] hostsList = new HttpHost[hosts.length];
       int i = 0;
-      for(String host: hosts)	{
+      for (String host : hosts) {
         hostsList[i++] = new HttpHost(host, port);
       }
       RestClientBuilder restClientBuilder = RestClient.builder(hostsList);
-      if (StringUtils.isNotBlank(cluster)) {
-        Header[] defaultHeaders = new Header[]{new BasicHeader("cluster.name", cluster)};
-        restClientBuilder.setDefaultHeaders(defaultHeaders);
-      } else	{
-        LOG.debug("No cluster name provided so using default");
+      if (auth) {
+        restClientBuilder
+            .setHttpClientConfigCallback(new HttpClientConfigCallback() {
+              @Override
+              public HttpAsyncClientBuilder customizeHttpClient(
+                  HttpAsyncClientBuilder arg0) {
+                return arg0.setDefaultCredentialsProvider(credentialsProvider);
+              }
+            });
       }
       client = new RestHighLevelClient(restClientBuilder);
-    } else	{
-      throw new IOException("ElasticRestClient initialization Failed!!!\\n\\nPlease Provide the hosts");
+    } else {
+      throw new IOException(
+          "ElasticRestClient initialization Failed!!!\\n\\nPlease Provide the hosts");
     }
 
     return client;
@@ -182,14 +217,15 @@
       @Override
       public void afterBulk(long executionId, BulkRequest request,
           Throwable failure) {
-        throw new RuntimeException(failure);
+        LOG.error("Elasticsearch indexing failed:", failure);
       }
 
       @Override
       public void afterBulk(long executionId, BulkRequest request,
           BulkResponse response) {
         if (response.hasFailures()) {
-          LOG.warn("Failures occurred during bulk request");
+          LOG.warn("Failures occurred during bulk request: {}",
+              response.buildFailureMessage());
         }
       }
     };
@@ -220,9 +256,8 @@
     }
     builder.endObject();
 
-    IndexRequest request = new IndexRequest(defaultIndex)
-                                           .id(id)
-                                           .source(builder);
+    IndexRequest request = new IndexRequest(defaultIndex).id(id)
+        .source(builder);
     request.opType(DocWriteRequest.OpType.INDEX);
 
     bulkProcessor.add(request);
@@ -258,26 +293,27 @@
   }
 
   /**
-   * Returns {@link Map} with the specific parameters the IndexWriter instance can take.
+   * Returns {@link Map} with the specific parameters the IndexWriter instance
+   * can take.
    *
-   * @return The values of each row. It must have the form <KEY,<DESCRIPTION,VALUE>>.
+   * @return The values of each row. It must have the form
+   *         <KEY,<DESCRIPTION,VALUE>>.
    */
   @Override
   public Map<String, Map.Entry<String, Object>> describe() {
     Map<String, Map.Entry<String, Object>> properties = new LinkedHashMap<>();
 
-    properties.put(ElasticConstants.CLUSTER, new AbstractMap.SimpleEntry<>(
-        "The cluster name to discover. Either host and port must be defined or cluster.",
-        this.cluster));
-    properties.put(ElasticConstants.HOSTS, new AbstractMap.SimpleEntry<>(
-        "Comma-separated list of hostnames to send documents to using TransportClient. "
-            + "Either host and port must be defined or cluster.",
+    properties.put(ElasticConstants.HOSTS,
+        new AbstractMap.SimpleEntry<>("Comma-separated list of hostnames",
             this.hosts == null ? "" : String.join(",", hosts)));
     properties.put(ElasticConstants.PORT, new AbstractMap.SimpleEntry<>(
-        "The port to connect to using TransportClient.", this.port));
-    properties.put(ElasticConstants.INDEX,
-        new AbstractMap.SimpleEntry<>("Default index to send documents to.",
-            this.defaultIndex));
+        "The port to connect to elastic server.", this.port));
+    properties.put(ElasticConstants.INDEX, new AbstractMap.SimpleEntry<>(
+        "Default index to send documents to.", this.defaultIndex));
+    properties.put(ElasticConstants.USER, new AbstractMap.SimpleEntry<>(
+        "Username for auth credentials", this.user));
+    properties.put(ElasticConstants.PASSWORD, new AbstractMap.SimpleEntry<>(
+        "Password for auth credentials", this.password));
     properties.put(ElasticConstants.MAX_BULK_DOCS,
         new AbstractMap.SimpleEntry<>(
             "Maximum size of the bulk in number of documents.",
diff --git a/src/plugin/indexer-solr/build-ivy.xml b/src/plugin/indexer-solr/build-ivy.xml
index 3f7e959..fe4d8c4 100644
--- a/src/plugin/indexer-solr/build-ivy.xml
+++ b/src/plugin/indexer-solr/build-ivy.xml
@@ -17,7 +17,7 @@
 -->
 <project name="indexer-solr" default="deps-jar" xmlns:ivy="antlib:org.apache.ivy.ant">
 
-    <property name="ivy.install.version" value="2.1.0" />
+    <property name="ivy.install.version" value="2.4.0" />
     <condition property="ivy.home" value="${env.IVY_HOME}">
       <isset property="env.IVY_HOME" />
     </condition>
diff --git a/src/plugin/indexer-solr/howto_upgrade_solr.txt b/src/plugin/indexer-solr/howto_upgrade_solr.txt
new file mode 100644
index 0000000..b2a7eb5
--- /dev/null
+++ b/src/plugin/indexer-solr/howto_upgrade_solr.txt
@@ -0,0 +1,33 @@
+1. Upgrade Solr dependency in src/plugin/indexer-solr/ivy.xml
+
+2. Upgrade the Solr specific dependencies in src/plugin/indexer-solr/plugin.xml
+   To get the list of dependencies and their versions execute:
+    $ cd src/plugin/indexer-solr/
+    $ ant -f ./build-ivy.xml
+    $ ls lib | sed 's/^/    <library name="/g' | sed 's/$/"\/>/g'
+
+   In the plugin.xml replace all lines between
+      <!-- Solr dependencies -->
+   and
+      <!-- end of Solr dependencies -->
+   with the output of the command above.
+
+4. (Optionally) remove overlapping dependencies between indexer-solr and Nutch core dependencies:
+   - check for libs present both in
+       build/lib
+     and
+       build/plugins/indexer-solr/
+     (eventually with different versions)
+   - duplicated libs can be added to the exclusions of transitive dependencies in
+       build/plugins/indexer-solr/ivy.xml
+   - but it should be made sure that the library versions in ivy/ivy.xml correspend to
+     those required by Tika
+
+5. Remove the locally "installed" dependencies in src/plugin/indexer-solr/lib/:
+
+    $ rm -rf lib/
+
+6. Build Nutch and run all unit tests:
+
+    $ cd ../../../
+    $ ant clean runtime test
\ No newline at end of file
diff --git a/src/plugin/indexer-solr/ivy.xml b/src/plugin/indexer-solr/ivy.xml
index c68c238..43dfd20 100644
--- a/src/plugin/indexer-solr/ivy.xml
+++ b/src/plugin/indexer-solr/ivy.xml
@@ -1,44 +1,47 @@
 <?xml version="1.0" ?>
 
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	You under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
 
 <ivy-module version="1.0">
-  <info organisation="org.apache.nutch" module="${ant.project.name}">
-    <license name="Apache 2.0"/>
-    <ivyauthor name="Apache Nutch Team" url="https://nutch.apache.org/"/>
-    <description>
-        Apache Nutch
-    </description>
-  </info>
+	<info organisation="org.apache.nutch" module="${ant.project.name}">
+		<license name="Apache 2.0" />
+		<ivyauthor name="Apache Nutch Team"
+			url="https://nutch.apache.org/" />
+		<description>
+			Apache Nutch
+		</description>
+	</info>
 
-  <configurations>
-    <include file="../../..//ivy/ivy-configurations.xml"/>
-  </configurations>
+	<configurations>
+		<include file="../../..//ivy/ivy-configurations.xml" />
+	</configurations>
 
-  <publications>
-    <!--get the artifact from our module name-->
-    <artifact conf="master"/>
-  </publications>
+	<publications>
+		<!--get the artifact from our module name -->
+		<artifact conf="master" />
+	</publications>
 
-  <dependencies>
-    <dependency org="org.apache.solr" name="solr-solrj" rev="7.3.1"/>
-    <dependency org="org.apache.httpcomponents" name="httpcore" rev="4.4.6" conf="*->default"/>
-    <dependency org="org.apache.httpcomponents" name="httpmime" rev="4.5.3" conf="*->default"/>
-  </dependencies>
-  
+	<dependencies>
+		<dependency org="org.apache.solr" name="solr-solrj"
+			rev="8.5.1">
+			<!-- exclusions of dependencies provided by Nutch core -->
+			<exclude org="org.apache.commons" name="commons-codec" />
+			<exclude org="org.apache.commons" name="commons-logging" />
+			<exclude org="org.slf4j" name="slf4j-api" />
+		</dependency>
+		<dependency org="org.apache.httpcomponents" name="httpmime"
+			rev="4.5.10" conf="*->default" />
+		<dependency org="org.apache.httpcomponents" name="httpcore"
+			rev="4.4.12" conf="*->default" />
+	</dependencies>
+
 </ivy-module>
diff --git a/src/plugin/indexer-solr/plugin.xml b/src/plugin/indexer-solr/plugin.xml
index 0d91eab..3738bd6 100644
--- a/src/plugin/indexer-solr/plugin.xml
+++ b/src/plugin/indexer-solr/plugin.xml
@@ -1,47 +1,64 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-  
-  http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	You under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
 <plugin id="indexer-solr" name="SolrIndexWriter" version="1.0.0"
-  provider-name="nutch.apache.org">
+	provider-name="nutch.apache.org">
 
-  <runtime>
-    <library name="indexer-solr.jar">
-      <export name="*" />
-    </library>
-      <library name="commons-io-2.5.jar"/>
-      <library name="httpclient-4.5.3.jar"/>
-      <library name="httpcore-4.4.6.jar"/>
-      <library name="httpmime-4.5.3.jar"/>
-      <library name="noggit-0.8.jar"/>
-      <library name="solr-solrj-7.3.1.jar"/>
-      <library name="stax2-api-3.1.4.jar"/>
-      <library name="woodstox-core-asl-4.4.1.jar"/>
-      <library name="zookeeper-3.4.11.jar"/>
-  </runtime>
+	<runtime>
+		<library name="indexer-solr.jar">
+			<export name="*" />
+		</library>
+		<!-- Solr dependencies -->
+		<library name="commons-io-2.6.jar" />
+		<library name="netty-buffer-4.1.29.Final.jar" />
+		<library name="netty-codec-4.1.29.Final.jar" />
+		<library name="netty-common-4.1.29.Final.jar" />
+		<library name="netty-handler-4.1.29.Final.jar" />
+		<library name="netty-resolver-4.1.29.Final.jar" />
+		<library name="netty-transport-4.1.29.Final.jar" />
+		<library name="netty-transport-native-epoll-4.1.29.Final.jar" />
+		<library
+			name="netty-transport-native-unix-common-4.1.29.Final.jar" />
+		<library name="commons-math3-3.6.1.jar" />
+		<library name="httpmime-4.5.10.jar" />
+		<library name="httpclient-4.5.10.jar" />
+		<library name="httpcore-4.4.12.jar" />
+		<library name="zookeeper-3.5.5.jar" />
+		<library name="zookeeper-jute-3.5.5.jar" />
+		<library name="stax2-api-3.1.4.jar" />
+		<library name="woodstox-core-asl-4.4.1.jar" />
+		<library name="jetty-alpn-client-9.4.24.v20191120.jar" />
+		<library name="jetty-alpn-java-client-9.4.24.v20191120.jar" />
+		<library name="jetty-client-9.4.24.v20191120.jar" />
+		<library name="jetty-http-9.4.24.v20191120.jar" />
+		<library name="jetty-io-9.4.24.v20191120.jar" />
+		<library name="jetty-util-9.4.24.v20191120.jar" />
+		<library name="http2-client-9.4.24.v20191120.jar" />
+		<library name="http2-common-9.4.24.v20191120.jar" />
+		<library name="http2-hpack-9.4.24.v20191120.jar" />
+		<library
+			name="http2-http-client-transport-9.4.24.v20191120.jar" />
+		<library name="jcl-over-slf4j-1.7.24.jar" />
+		<!-- end of Solr dependencies -->
+		<library name="solr-solrj-8.5.1.jar" />
+	</runtime>
 
-  <requires>
-    <import plugin="nutch-extensionpoints" />
-  </requires>
+	<requires>
+		<import plugin="nutch-extensionpoints" />
+	</requires>
 
-  <extension id="org.apache.nutch.indexer.solr"
-    name="Solr Index Writer"
-    point="org.apache.nutch.indexer.IndexWriter">
-    <implementation id="SolrIndexWriter"
-      class="org.apache.nutch.indexwriter.solr.SolrIndexWriter" />
-  </extension>
+	<extension id="org.apache.nutch.indexer.solr"
+		name="Solr Index Writer" point="org.apache.nutch.indexer.IndexWriter">
+		<implementation id="SolrIndexWriter"
+			class="org.apache.nutch.indexwriter.solr.SolrIndexWriter" />
+	</extension>
 
 </plugin>
diff --git a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java b/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java
index 86ca3eb..3b03e7d 100644
--- a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java
+++ b/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java
@@ -74,13 +74,14 @@
 
   @Override
   public void open(Configuration conf, String name) {
-    //Implementation not required
+    // Implementation not required
   }
 
   /**
    * Initializes the internal variables from a given index writer configuration.
    *
-   * @param parameters Params from the index writer configuration.
+   * @param parameters
+   *          Params from the index writer configuration.
    */
   @Override
   public void open(IndexWriterParams parameters) {
@@ -107,10 +108,10 @@
       }
       break;
     case "cloud":
-      CloudSolrClient sc = this.auth ?
-          SolrUtils.getCloudSolrClient(Arrays.asList(urls), this.username,
-              this.password) :
-          SolrUtils.getCloudSolrClient(Arrays.asList(urls));
+      CloudSolrClient sc = this.auth
+          ? SolrUtils.getCloudSolrClient(Arrays.asList(urls), this.username,
+              this.password)
+          : SolrUtils.getCloudSolrClient(Arrays.asList(urls));
       sc.setDefaultCollection(this.collection);
       solrClients.add(sc);
       break;
@@ -150,6 +151,7 @@
     }
   }
 
+  @Override
   public void delete(String key) throws IOException {
     // escape solr hash separator
     key = key.replaceAll("!", "\\!");
@@ -170,6 +172,7 @@
     write(doc);
   }
 
+  @Override
   public void write(NutchDocument doc) throws IOException {
     final SolrInputDocument inputDoc = new SolrInputDocument();
 
@@ -201,6 +204,7 @@
     }
   }
 
+  @Override
   public void close() throws IOException {
     commit();
 
@@ -231,10 +235,8 @@
   private void push() throws IOException {
     if (inputDocs.size() > 0) {
       try {
-        LOG.info(
-            "Indexing " + Integer.toString(inputDocs.size()) + "/" + Integer
-                .toString(totalAdds) + " documents");
-        LOG.info("Deleting " + Integer.toString(numDeletes) + " documents");
+        LOG.info("Indexing {}/{} documents", inputDocs.size(), totalAdds);
+        LOG.info("Deleting {} documents", numDeletes);
         numDeletes = 0;
         UpdateRequest req = new UpdateRequest();
         req.add(inputDocs);
@@ -254,9 +256,8 @@
 
     if (deleteIds.size() > 0) {
       try {
-        LOG.info(
-            "SolrIndexer: deleting " + Integer.toString(deleteIds.size()) + "/"
-                + Integer.toString(totalDeletes) + " documents");
+        LOG.info("SolrIndexer: deleting {}/{} documents", deleteIds.size(),
+            totalDeletes);
         
         UpdateRequest req = new UpdateRequest();
         req.deleteById(deleteIds);
@@ -265,11 +266,11 @@
         if (this.auth) {
           req.setBasicAuthCredentials(this.username, this.password);
         }
-        
+
         for (SolrClient solrClient : solrClients) {
           solrClient.request(req);
         }
-        
+
       } catch (final SolrServerException e) {
         LOG.error("Error deleting: " + deleteIds);
         throw makeIOException(e);
@@ -293,9 +294,11 @@
   }
 
   /**
-   * Returns {@link Map} with the specific parameters the IndexWriter instance can take.
+   * Returns {@link Map} with the specific parameters the IndexWriter instance
+   * can take.
    *
-   * @return The values of each row. It must have the form <KEY,<DESCRIPTION,VALUE>>.
+   * @return The values of each row. It must have the form
+   *         <KEY,<DESCRIPTION,VALUE>>.
    */
   @Override
   public Map<String, Entry<String, Object>> describe() {
@@ -323,12 +326,10 @@
     properties.put(SolrConstants.USE_AUTH, new AbstractMap.SimpleEntry<>(
         "Whether to enable HTTP basic authentication for communicating with Solr. Use the username and password properties to configure your credentials.",
         this.auth));
-    properties.put(SolrConstants.USERNAME,
-        new AbstractMap.SimpleEntry<>("The username of Solr server.",
-            this.username));
-    properties.put(SolrConstants.PASSWORD,
-        new AbstractMap.SimpleEntry<>("The password of Solr server.",
-            this.password));
+    properties.put(SolrConstants.USERNAME, new AbstractMap.SimpleEntry<>(
+        "The username of Solr server.", this.username));
+    properties.put(SolrConstants.PASSWORD, new AbstractMap.SimpleEntry<>(
+        "The password of Solr server.", this.password));
 
     return properties;
   }
diff --git a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java b/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
index 196fc5d..8f97b16 100644
--- a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
+++ b/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
@@ -37,21 +37,21 @@
     return sc;
   }
 
-  static CloudSolrClient getCloudSolrClient(List<String> urls, String username, String password) {
+  static CloudSolrClient getCloudSolrClient(List<String> urls, String username,
+      String password) {
     // Building http client
     CredentialsProvider provider = new BasicCredentialsProvider();
-    UsernamePasswordCredentials credentials
-        = new UsernamePasswordCredentials(username, password);
+    UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(
+        username, password);
     provider.setCredentials(AuthScope.ANY, credentials);
 
     HttpClient client = HttpClientBuilder.create()
-        .setDefaultCredentialsProvider(provider)
-        .build();
+        .setDefaultCredentialsProvider(provider).build();
 
     // Building the client
     CloudSolrClient sc = new CloudSolrClient.Builder(urls)
         .withParallelUpdates(true).withHttpClient(client).build();
-        sc.connect();
+    sc.connect();
     return sc;
   }
 
diff --git a/src/plugin/parse-tika/build-ivy.xml b/src/plugin/parse-tika/build-ivy.xml
index 285bfcd..c67ea7a 100644
--- a/src/plugin/parse-tika/build-ivy.xml
+++ b/src/plugin/parse-tika/build-ivy.xml
@@ -25,6 +25,7 @@
     <property name="ivy.checksums" value="" />
     <property name="ivy.jar.dir" value="${ivy.home}/lib" />
     <property name="ivy.jar.file" value="${ivy.jar.dir}/ivy-${ivy.install.version}.jar" />
+    <ivy:settings id="ivy.instance" file="../../../ivy/ivysettings.xml" />
 
     <target name="download-ivy" unless="offline">
 
diff --git a/src/plugin/parse-tika/howto_upgrade_tika.txt b/src/plugin/parse-tika/howto_upgrade_tika.txt
index aa4147c..ca3cdae 100644
--- a/src/plugin/parse-tika/howto_upgrade_tika.txt
+++ b/src/plugin/parse-tika/howto_upgrade_tika.txt
@@ -23,7 +23,7 @@
      (eventually with different versions)
    - duplicated libs can be added to the exclusions of transitive dependencies in
        build/plugins/parse-tika/ivy.xml
-   - but it should be made sure that the library versions in ivy/ivy.xml correspend to
+   - but it should be made sure that the library versions in ivy/ivy.xml correspond to
      those required by Tika
 
 5. Remove the locally "installed" dependencies in src/plugin/parse-tika/lib/:
diff --git a/src/plugin/parse-tika/ivy.xml b/src/plugin/parse-tika/ivy.xml
index f03dbef..574af75 100644
--- a/src/plugin/parse-tika/ivy.xml
+++ b/src/plugin/parse-tika/ivy.xml
@@ -36,7 +36,7 @@
   </publications>
 
   <dependencies>
-    <dependency org="org.apache.tika" name="tika-parsers" rev="1.22" conf="*->default">
+    <dependency org="org.apache.tika" name="tika-parsers" rev="1.24.1" conf="*->default">
       <!-- exclusions of dependencies provided in Nutch core (ivy/ivy.xml) -->
       <exclude org="org.apache.tika" name="tika-core" />
       <exclude org="org.apache.httpcomponents" name="httpclient" />
diff --git a/src/plugin/parse-tika/plugin.xml b/src/plugin/parse-tika/plugin.xml
index 18dad6c..8b87ac9 100644
--- a/src/plugin/parse-tika/plugin.xml
+++ b/src/plugin/parse-tika/plugin.xml
@@ -26,45 +26,40 @@
          <export name="*"/>
       </library>
       <!-- dependencies of Tika (tika-parsers) -->
-      <library name="animal-sniffer-annotations-1.17.jar"/>
-      <library name="ant-1.10.5.jar"/>
-      <library name="ant-launcher-1.10.5.jar"/>
       <library name="apache-mime4j-core-0.8.3.jar"/>
       <library name="apache-mime4j-dom-0.8.3.jar"/>
-      <library name="asm-7.2-beta.jar"/>
-      <library name="bcmail-jdk15on-1.62.jar"/>
-      <library name="bcpkix-jdk15on-1.62.jar"/>
-      <library name="bcprov-jdk15on-1.62.jar"/>
+      <library name="asm-8.0.1.jar"/>
+      <library name="bcmail-jdk15on-1.65.jar"/>
+      <library name="bcpkix-jdk15on-1.65.jar"/>
+      <library name="bcprov-jdk15on-1.65.jar"/>
       <library name="boilerpipe-1.1.0.jar"/>
       <library name="bzip2-0.9.1.jar"/>
-      <library name="c3p0-0.9.5.4.jar"/>
+      <library name="c3p0-0.9.5.5.jar"/>
       <library name="cdm-4.5.5.jar"/>
-      <library name="checker-qual-2.8.1.jar"/>
-      <library name="codemodel-2.3.2.jar"/>
-      <library name="commons-csv-1.7.jar"/>
+      <library name="checker-qual-2.10.0.jar"/>
+      <library name="commons-csv-1.8.jar"/>
       <library name="commons-exec-1.3.jar"/>
       <library name="commons-io-2.6.jar"/>
       <library name="commons-logging-1.2.jar"/>
       <library name="commons-math3-3.6.1.jar"/>
-      <library name="curvesapi-1.05.jar"/>
-      <library name="cxf-rt-rs-client-3.3.2.jar"/>
-      <library name="cxf-rt-security-3.3.2.jar"/>
+      <library name="curvesapi-1.06.jar"/>
+      <library name="cxf-rt-rs-client-3.3.6.jar"/>
+      <library name="cxf-rt-security-3.3.6.jar"/>
       <library name="dec-0.1.2.jar"/>
-      <library name="dtd-parser-1.4.1.jar"/>
       <library name="ehcache-core-2.6.2.jar"/>
-      <library name="error_prone_annotations-2.3.2.jar"/>
+      <library name="error_prone_annotations-2.3.4.jar"/>
       <library name="failureaccess-1.0.1.jar"/>
       <library name="FastInfoset-1.2.16.jar"/>
-      <library name="fontbox-2.0.16.jar"/>
+      <library name="fontbox-2.0.19.jar"/>
       <library name="geoapi-3.0.1.jar"/>
       <library name="grib-4.5.5.jar"/>
-      <library name="gson-2.8.5.jar"/>
-      <library name="guava-28.0-jre.jar"/>
-      <library name="httpmime-4.5.9.jar"/>
+      <library name="gson-2.8.6.jar"/>
+      <library name="guava-28.2-jre.jar"/>
+      <library name="HikariCP-java7-2.4.13.jar"/>
+      <library name="httpmime-4.5.12.jar"/>
       <library name="httpservices-4.5.5.jar"/>
-      <library name="isoparser-1.1.22.jar"/>
+      <library name="isoparser-1.9.41.2.jar"/>
       <library name="istack-commons-runtime-3.0.8.jar"/>
-      <library name="istack-commons-tools-3.0.8.jar"/>
       <library name="j2objc-annotations-1.3.jar"/>
       <library name="jackcess-3.0.1.jar"/>
       <library name="jackcess-encrypt-3.0.0.jar"/>
@@ -73,55 +68,54 @@
       <library name="jakarta.activation-api-1.2.1.jar"/>
       <library name="jakarta.ws.rs-api-2.1.5.jar"/>
       <library name="jakarta.xml.bind-api-2.3.2.jar"/>
-      <library name="java-libpst-0.8.1.jar"/>
+      <library name="java-libpst-0.9.3.jar"/>
       <library name="javax.annotation-api-1.3.2.jar"/>
       <library name="jaxb-runtime-2.3.2.jar"/>
-      <library name="jaxb-xjc-2.3.2.jar"/>
-      <library name="jbig2-imageio-3.0.2.jar"/>
+      <library name="jbig2-imageio-3.0.3.jar"/>
       <library name="jcip-annotations-1.0.jar"/>
-      <library name="jcl-over-slf4j-1.7.26.jar"/>
-      <library name="jcommander-1.35.jar"/>
+      <library name="jcl-over-slf4j-1.7.28.jar"/>
+      <library name="jcommander-1.78.jar"/>
       <library name="jdom2-2.0.6.jar"/>
       <library name="jempbox-1.8.16.jar"/>
       <library name="jhighlight-1.0.3.jar"/>
       <library name="jmatio-1.5.jar"/>
-      <library name="jna-5.3.1.jar"/>
+      <library name="jna-5.5.0.jar"/>
       <library name="joda-time-2.2.jar"/>
       <library name="json-simple-1.1.1.jar"/>
-      <library name="jsoup-1.12.1.jar"/>
+      <library name="jsoup-1.13.1.jar"/>
       <library name="jsr305-3.0.2.jar"/>
-      <library name="jul-to-slf4j-1.7.26.jar"/>
+      <library name="jul-to-slf4j-1.7.28.jar"/>
       <library name="juniversalchardet-1.0.3.jar"/>
       <library name="junrar-4.0.0.jar"/>
       <library name="listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar"/>
-      <library name="mchange-commons-java-0.2.15.jar"/>
-      <library name="metadata-extractor-2.11.0.jar"/>
+      <library name="mchange-commons-java-0.2.19.jar"/>
+      <library name="metadata-extractor-2.13.0.jar"/>
       <library name="netcdf4-4.5.5.jar"/>
-      <library name="openjson-1.0.11.jar"/>
-      <library name="opennlp-tools-1.9.1.jar"/>
+      <library name="openjson-1.0.12.jar"/>
+      <library name="opennlp-tools-1.9.2.jar"/>
       <library name="parso-2.0.11.jar"/>
-      <library name="pdfbox-2.0.16.jar"/>
-      <library name="pdfbox-tools-2.0.16.jar"/>
-      <library name="poi-4.0.1.jar"/>
-      <library name="poi-ooxml-4.0.1.jar"/>
-      <library name="poi-ooxml-schemas-4.0.1.jar"/>
-      <library name="poi-scratchpad-4.0.1.jar"/>
-      <library name="quartz-2.2.0.jar"/>
-      <library name="relaxng-datatype-2.3.2.jar"/>
-      <library name="rngom-2.3.2.jar"/>
-      <library name="rome-1.12.1.jar"/>
-      <library name="rome-utils-1.12.1.jar"/>
+      <library name="pdfbox-2.0.19.jar"/>
+      <library name="pdfbox-tools-2.0.19.jar"/>
+      <library name="poi-4.1.2.jar"/>
+      <library name="poi-ooxml-4.1.2.jar"/>
+      <library name="poi-ooxml-schemas-4.1.2.jar"/>
+      <library name="poi-scratchpad-4.1.2.jar"/>
+      <library name="preflight-2.0.19.jar"/>
+      <library name="quartz-2.3.2.jar"/>
+      <library name="rome-1.12.2.jar"/>
+      <library name="rome-utils-1.12.2.jar"/>
       <library name="sentiment-analysis-parser-0.1.jar"/>
-      <library name="sis-feature-0.8.jar"/>
-      <library name="sis-metadata-0.8.jar"/>
-      <library name="sis-netcdf-0.8.jar"/>
-      <library name="sis-referencing-0.8.jar"/>
-      <library name="sis-storage-0.8.jar"/>
-      <library name="sis-utility-0.8.jar"/>
+      <library name="sis-feature-1.0.jar"/>
+      <library name="sis-metadata-1.0.jar"/>
+      <library name="sis-netcdf-1.0.jar"/>
+      <library name="sis-referencing-1.0.jar"/>
+      <library name="sis-storage-1.0.jar"/>
+      <library name="sis-utility-1.0.jar"/>
+      <library name="SparseBitSet-1.2.jar"/>
       <library name="stax2-api-3.1.4.jar"/>
-      <library name="stax-ex-1.8.1.jar"/>
+      <library name="stax-ex-1.8.2.jar"/>
       <library name="tagsoup-1.2.1.jar"/>
-      <library name="tika-parsers-1.22.jar"/>
+      <library name="tika-parsers-1.24.1.jar"/>
       <library name="txw2-2.3.2.jar"/>
       <library name="udunits-4.5.5.jar"/>
       <library name="unit-api-1.0.jar"/>
@@ -129,10 +123,11 @@
       <library name="vorbis-java-tika-0.8.jar"/>
       <library name="woodstox-core-5.0.3.jar"/>
       <library name="xercesImpl-2.12.0.jar"/>
-      <library name="xmlbeans-3.0.2.jar"/>
-      <library name="xmlschema-core-2.2.4.jar"/>
-      <library name="xmpcore-5.1.3.jar"/>
-      <library name="xsom-2.3.2.jar"/>
+      <library name="xmlbeans-3.1.0.jar"/>
+      <library name="xmlschema-core-2.2.5.jar"/>
+      <library name="xmpbox-2.0.19.jar"/>
+      <library name="xmpcore-6.1.10.jar"/>
+      <library name="xmpcore-shaded-6.1.10.jar"/>
       <library name="xz-1.8.jar"/>
       <!-- end of dependencies of Tika (tika-parsers) -->
    </runtime>
diff --git a/src/plugin/subcollection/src/java/org/apache/nutch/collection/Subcollection.java b/src/plugin/subcollection/src/java/org/apache/nutch/collection/Subcollection.java
index 8478390..36f33ca 100644
--- a/src/plugin/subcollection/src/java/org/apache/nutch/collection/Subcollection.java
+++ b/src/plugin/subcollection/src/java/org/apache/nutch/collection/Subcollection.java
@@ -19,6 +19,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.conf.Configuration;
@@ -239,8 +240,10 @@
     while (st.hasMoreElements()) {
       String line = (String) st.nextElement();
       line = line.trim();
+      if (line.isEmpty())
+        continue;
       if (caseInsensitive) {
-        line = line.toLowerCase();
+        line = line.toLowerCase(Locale.ROOT);
       }
       list.add(line);
     }