diff --git a/build.xml b/build.xml
index f50395e..ae0f111 100644
--- a/build.xml
+++ b/build.xml
@@ -1102,7 +1102,6 @@
         <source path="${plugins.dir}/indexer-dummy/src/java/" />
         <source path="${plugins.dir}/indexer-elastic-rest/src/java/"/>
         <source path="${plugins.dir}/indexer-elastic/src/java/" />
-        <source path="${plugins.dir}/indexer-elastic/src/test/" />
         <source path="${plugins.dir}/indexer-kafka/src/java/" />
         <source path="${plugins.dir}/indexer-rabbit/src/java/" />
         <source path="${plugins.dir}/indexer-solr/src/java/" />
diff --git a/conf/index-writers.xml.template b/conf/index-writers.xml.template
index 808e31f..96c765e 100644
--- a/conf/index-writers.xml.template
+++ b/conf/index-writers.xml.template
@@ -106,8 +106,8 @@
   </writer>
   <writer id="indexer_elastic_1" class="org.apache.nutch.indexwriter.elastic.ElasticIndexWriter">
     <parameters>
-      <param name="host" value=""/>
-      <param name="port" value="9300"/>
+      <param name="host" value="localhost"/>
+      <param name="port" value="9200"/>
       <param name="cluster" value=""/>
       <param name="index" value="nutch"/>
       <param name="max.bulk.docs" value="250"/>
diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
index a7d2f11..f59f895 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
@@ -42,6 +42,7 @@
 import com.tdunning.math.stats.TDigest;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -89,12 +90,29 @@
 
   protected String crawlDb;
 
+  private long lastModified = 0;
+
   private void openReaders(String crawlDb, Configuration config)
       throws IOException {
-    if (readers != null)
-      return;
     Path crawlDbPath = new Path(crawlDb, CrawlDb.CURRENT_NAME);
-    readers = MapFileOutputFormat.getReaders(crawlDbPath, config);
+
+    FileStatus stat = crawlDbPath.getFileSystem(config).getFileStatus(crawlDbPath);
+    long lastModified = stat.getModificationTime();
+
+    synchronized (this) {
+      if (readers != null) {
+        if (this.lastModified == lastModified) {
+          // CrawlDB not modified, re-use readers
+          return;
+        } else {
+          // CrawlDB modified, close and re-open readers
+          closeReaders();
+        }
+      }
+
+      this.lastModified = lastModified;
+      readers = MapFileOutputFormat.getReaders(crawlDbPath, config);
+    }
   }
 
   private void closeReaders() {
@@ -627,8 +645,6 @@
   protected int process(String line, StringBuilder output) throws Exception {
     Job job = NutchJob.getInstance(getConf());
     Configuration config = job.getConfiguration();
-    // Close readers, so we know we're not working on stale data
-    closeReaders();
     readUrl(this.crawlDb, line, config, output);
     return 0;
   }
diff --git a/src/java/org/apache/nutch/crawl/LinkDbReader.java b/src/java/org/apache/nutch/crawl/LinkDbReader.java
index 2d2a901..6ea3c26 100644
--- a/src/java/org/apache/nutch/crawl/LinkDbReader.java
+++ b/src/java/org/apache/nutch/crawl/LinkDbReader.java
@@ -26,6 +26,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.Text;
@@ -63,6 +64,8 @@
   private Path directory;
   private MapFile.Reader[] readers;
 
+  private long lastModified = 0;
+
   public LinkDbReader() {
     //default constructor
   }
@@ -76,6 +79,28 @@
     this.directory = directory;
   }
 
+  public void openReaders() throws IOException {
+    Path linkDbPath = new Path(directory, LinkDb.CURRENT_NAME);
+
+    FileStatus stat = linkDbPath.getFileSystem(getConf()).getFileStatus(directory);
+    long lastModified = stat.getModificationTime();
+
+    synchronized (this) {
+      if (readers != null) {
+        if (this.lastModified == lastModified) {
+          // CrawlDB not modified, re-use readers
+          return;
+        } else {
+          // CrawlDB modified, close and re-open readers
+          close();
+        }
+      }
+
+      this.lastModified = lastModified;
+      readers = MapFileOutputFormat.getReaders(linkDbPath, getConf());
+    }
+  }
+
   public String[] getAnchors(Text url) throws IOException {
     Inlinks inlinks = getInlinks(url);
     if (inlinks == null)
@@ -84,13 +109,7 @@
   }
 
   public Inlinks getInlinks(Text url) throws IOException {
-
-    if (readers == null) {
-      synchronized (this) {
-        readers = MapFileOutputFormat.getReaders(new Path(directory,
-            LinkDb.CURRENT_NAME), getConf());
-      }
-    }
+    openReaders();
 
     return (Inlinks) MapFileOutputFormat.getEntry(readers, PARTITIONER, url,
         new Inlinks());
@@ -188,6 +207,7 @@
       Iterator<Inlink> it = links.iterator();
       while (it.hasNext()) {
         output.append(it.next().toString());
+        output.append("\n");
       }
     }
     output.append("\n");
diff --git a/src/plugin/build.xml b/src/plugin/build.xml
index 51c3fe7..b0882a5 100755
--- a/src/plugin/build.xml
+++ b/src/plugin/build.xml
@@ -125,7 +125,6 @@
      <ant dir="index-replace" target="test"/>
      <ant dir="index-static" target="test"/>
      <ant dir="indexer-csv" target="test"/>
-     <ant dir="indexer-elastic" target="test"/>
      <ant dir="language-identifier" target="test"/>
      <ant dir="lib-http" target="test"/>
      <ant dir="lib-regex-filter" target="test"/>
diff --git a/src/plugin/indexer-elastic/build.xml b/src/plugin/indexer-elastic/build.xml
index 6955f61..4167d09 100644
--- a/src/plugin/indexer-elastic/build.xml
+++ b/src/plugin/indexer-elastic/build.xml
@@ -26,9 +26,6 @@
 
   <!-- Deploy Unit test dependencies -->
   <target name="deps-test">
-    <copy toDir="${build.test}">
-      <fileset dir="${src.test}" excludes="**/*.java"/>
-    </copy>
   </target>
 
 
diff --git a/src/plugin/indexer-elastic/howto_upgrade_es.txt b/src/plugin/indexer-elastic/howto_upgrade_es.txt
index b577053..a815644 100644
--- a/src/plugin/indexer-elastic/howto_upgrade_es.txt
+++ b/src/plugin/indexer-elastic/howto_upgrade_es.txt
@@ -1,6 +1,33 @@
-1. Upgrade elasticsearch dependency in src/plugin/indexer-elastic/ivy.xml
+1. Upgrade Elasticsearch dependency in src/plugin/indexer-elastic/ivy.xml
 
 2. Upgrade the Elasticsearch specific dependencies in src/plugin/indexer-elastic/plugin.xml
    To get the list of dependencies and their versions execute:
-   $ ant -f ./build-ivy.xml
-   $ ls lib/
+    $ cd src/plugin/indexer-elastic/
+    $ ant -f ./build-ivy.xml
+    $ ls lib | sed 's/^/    <library name="/g' | sed 's/$/"\/>/g'
+
+   In the plugin.xml replace all lines between
+      <!-- Elastic Rest Client dependencies -->
+   and
+      <!-- end of Elastic Rest Client dependencies -->
+   with the output of the command above.
+
+4. (Optionally) remove overlapping dependencies between indexer-elastic and Nutch core dependencies:
+   - check for libs present both in
+       build/lib
+     and
+       build/plugins/indexer-elastic/
+     (eventually with different versions)
+   - duplicated libs can be added to the exclusions of transitive dependencies in
+       build/plugins/indexer-elastic/ivy.xml
+   - but it should be made sure that the library versions in ivy/ivy.xml correspend to
+     those required by Tika
+
+5. Remove the locally "installed" dependencies in src/plugin/indexer-elastic/lib/:
+
+    $ rm -rf lib/
+
+6. Build Nutch and run all unit tests:
+
+    $ cd ../../../
+    $ ant clean runtime test
\ No newline at end of file
diff --git a/src/plugin/indexer-elastic/ivy.xml b/src/plugin/indexer-elastic/ivy.xml
index 48ea594..2b42629 100644
--- a/src/plugin/indexer-elastic/ivy.xml
+++ b/src/plugin/indexer-elastic/ivy.xml
@@ -27,7 +27,7 @@
   </info>
 
   <configurations>
-    <include file="../../..//ivy/ivy-configurations.xml"/>
+    <include file="../../../ivy/ivy-configurations.xml"/>
   </configurations>
 
   <publications>
@@ -36,10 +36,12 @@
   </publications>
 
   <dependencies>
-    <dependency org="org.elasticsearch" name="elasticsearch" rev="5.3.0" conf="*->default"/>
-    <dependency org="org.elasticsearch.client" name="transport" rev="5.3.0"/>
-    <dependency org="org.apache.logging.log4j" name="log4j-api" rev="2.7" />
-    <dependency org="org.apache.logging.log4j" name="log4j-core" rev="2.7" />
+    <dependency org="org.elasticsearch.client" name="elasticsearch-rest-high-level-client" rev="7.3.0">
+      <!-- exclusions of dependencies provided in Nutch core (ivy/ivy.xml) -->
+      <exclude org="commons-codec" name="commons-codec" />
+      <exclude org="commons-logging" name="commons-logging" />
+      <exclude org="com.tdunning" name="t-digest" />
+    </dependency>
   </dependencies>
   
 </ivy-module>
diff --git a/src/plugin/indexer-elastic/plugin.xml b/src/plugin/indexer-elastic/plugin.xml
index 2cbb53a..45ac61e 100644
--- a/src/plugin/indexer-elastic/plugin.xml
+++ b/src/plugin/indexer-elastic/plugin.xml
@@ -6,9 +6,7 @@
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at
-
   http://www.apache.org/licenses/LICENSE-2.0
-
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,63 +21,52 @@
       <export name="*" />
     </library>
 
-    <!-- DEPENDENCIES FOR: https://mvnrepository.com/artifact/org.elasticsearch.client/rest/5.3.0 -->
-    <library name="rest-5.3.0.jar"/>
-    <library name="commons-codec-1.10.jar"/>
-    <library name="commons-logging-1.1.3.jar"/>
-    <library name="httpasyncclient-4.1.2.jar"/>
-    <library name="httpclient-4.5.2.jar"/>
-    <library name="httpcore-4.4.5.jar"/>
-    <library name="httpcore-nio-4.4.5.jar"/>
-    <!-- ElasticSearch DEPENDENCIES-->
-    <library name="elasticsearch-5.3.0.jar"/>
+    <!-- Elastic Rest Client Dependencies -->
+    <library name="aggs-matrix-stats-client-7.3.0.jar"/>
     <library name="compiler-0.9.3.jar"/>
-    <library name="HdrHistogram-2.1.6.jar"/>
-    <library name="hppc-0.7.1.jar"/>
-    <library name="jackson-core-2.8.6.jar"/>
-    <library name="jackson-dataformat-cbor-2.8.6.jar"/>
-    <library name="jackson-dataformat-smile-2.8.6.jar"/>
-    <library name="jackson-dataformat-yaml-2.8.6.jar"/>
-    <library name="jna-4.2.2.jar"/>
-    <library name="joda-time-2.9.5.jar"/>
+    <library name="elasticsearch-7.3.0.jar"/>
+    <library name="elasticsearch-cli-7.3.0.jar"/>
+    <library name="elasticsearch-core-7.3.0.jar"/>
+    <library name="elasticsearch-geo-7.3.0.jar"/>
+    <library name="elasticsearch-rest-client-7.3.0.jar"/>
+    <library name="elasticsearch-rest-high-level-client-7.3.0.jar"/>
+    <library name="elasticsearch-secure-sm-7.3.0.jar"/>
+    <library name="elasticsearch-x-content-7.3.0.jar"/>
+    <library name="HdrHistogram-2.1.9.jar"/>
+    <library name="hppc-0.8.1.jar"/>
+    <library name="httpasyncclient-4.1.4.jar"/>
+    <library name="httpclient-4.5.8.jar"/>
+    <library name="httpcore-4.4.11.jar"/>
+    <library name="httpcore-nio-4.4.11.jar"/>
+    <library name="jackson-core-2.8.11.jar"/>
+    <library name="jackson-dataformat-cbor-2.8.11.jar"/>
+    <library name="jackson-dataformat-smile-2.8.11.jar"/>
+    <library name="jackson-dataformat-yaml-2.8.11.jar"/>
+    <library name="jna-4.5.1.jar"/>
+    <library name="joda-time-2.10.2.jar"/>
     <library name="jopt-simple-5.0.2.jar"/>
-    <library name="log4j-api-2.7.jar"/>
-    <library name="log4j-core-2.7.jar"/>
-    <library name="lucene-analyzers-common-6.4.1.jar"/>
-    <library name="lucene-backward-codecs-6.4.1.jar"/>
-    <library name="lucene-core-6.4.1.jar"/>
-    <library name="lucene-grouping-6.4.1.jar"/>
-    <library name="lucene-highlighter-6.4.1.jar"/>
-    <library name="lucene-join-6.4.1.jar"/>
-    <library name="lucene-memory-6.4.1.jar"/>
-    <library name="lucene-misc-6.4.1.jar"/>
-    <library name="lucene-queries-6.4.1.jar"/>
-    <library name="lucene-queryparser-6.4.1.jar"/>
-    <library name="lucene-sandbox-6.4.1.jar"/>
-    <library name="lucene-spatial3d-6.4.1.jar"/>
-    <library name="lucene-spatial-6.4.1.jar"/>
-    <library name="lucene-spatial-extras-6.4.1.jar"/>
-    <library name="lucene-suggest-6.4.1.jar"/>
-    <library name="securesm-1.1.jar"/>
-    <library name="snakeyaml-1.15.jar"/>
-    <library name="t-digest-3.0.jar"/>
+    <library name="lang-mustache-client-7.3.0.jar"/>
+    <library name="log4j-api-2.11.1.jar"/>
+    <library name="lucene-analyzers-common-8.1.0.jar"/>
+    <library name="lucene-backward-codecs-8.1.0.jar"/>
+    <library name="lucene-core-8.1.0.jar"/>
+    <library name="lucene-grouping-8.1.0.jar"/>
+    <library name="lucene-highlighter-8.1.0.jar"/>
+    <library name="lucene-join-8.1.0.jar"/>
+    <library name="lucene-memory-8.1.0.jar"/>
+    <library name="lucene-misc-8.1.0.jar"/>
+    <library name="lucene-queries-8.1.0.jar"/>
+    <library name="lucene-queryparser-8.1.0.jar"/>
+    <library name="lucene-sandbox-8.1.0.jar"/>
+    <library name="lucene-spatial3d-8.1.0.jar"/>
+    <library name="lucene-spatial-8.1.0.jar"/>
+    <library name="lucene-spatial-extras-8.1.0.jar"/>
+    <library name="lucene-suggest-8.1.0.jar"/>
+    <library name="parent-join-client-7.3.0.jar"/>
+    <library name="rank-eval-client-7.3.0.jar"/>
+    <library name="snakeyaml-1.17.jar"/>
+    <!-- end of Elastic Rest Client dependencies -->
 
-    <!-- ElasticSearch Transport jar Deps-->
-    <library name="percolator-client-5.3.0.jar"/>
-    <library name="reindex-client-5.3.0.jar"/>
-    <library name="lang-mustache-client-5.3.0.jar"/>
-    <library name="transport-5.3.0.jar"/>
-    <library name="transport-netty3-client-5.3.0.jar"/>
-    <library name="transport-netty4-client-5.3.0.jar"/>
-    <library name="netty-all-4.1.7.Final.jar"/>
-    <library name="netty-3.10.6.Final.jar"/>
-    <library name="netty-buffer-4.1.7.Final.jar"/>
-    <library name="netty-codec-4.1.7.Final.jar"/>
-    <library name="netty-codec-http-4.1.7.Final.jar"/>
-    <library name="netty-common-4.1.7.Final.jar"/>
-    <library name="netty-handler-4.1.7.Final.jar"/>
-    <library name="netty-resolver-4.1.7.Final.jar"/>
-    <library name="netty-transport-4.1.7.Final.jar"/>
   </runtime>
 
   <requires>
@@ -93,4 +80,4 @@
       class="org.apache.nutch.indexwriter.elastic.ElasticIndexWriter" />
   </extension>
 
-</plugin>
+</plugin>
\ No newline at end of file
diff --git a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
index ee31527..74727a0 100644
--- a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
+++ b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
@@ -17,10 +17,9 @@
 package org.apache.nutch.indexwriter.elastic;
 
 import java.lang.invoke.MethodHandles;
+import java.time.format.DateTimeFormatter;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.AbstractMap;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,25 +27,30 @@
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.http.Header;
+import org.apache.http.HttpHost;
+import org.apache.http.message.BasicHeader;
 import org.apache.nutch.indexer.IndexWriter;
 import org.apache.nutch.indexer.IndexWriterParams;
 import org.apache.nutch.indexer.NutchDocument;
 import org.apache.nutch.indexer.NutchField;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.client.RequestOptions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +61,7 @@
   private static final Logger LOG = LoggerFactory
       .getLogger(MethodHandles.lookup().lookupClass());
 
+
   private static final int DEFAULT_PORT = 9300;
   private static final int DEFAULT_MAX_BULK_DOCS = 250;
   private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
@@ -75,8 +80,7 @@
   private int expBackoffRetries;
 
   private String defaultIndex;
-  private Client client;
-  private Node node;
+  private RestHighLevelClient client;
   private BulkProcessor bulkProcessor;
 
   private long bulkCloseTimeout;
@@ -99,8 +103,8 @@
     cluster = parameters.get(ElasticConstants.CLUSTER);
     String hosts = parameters.get(ElasticConstants.HOSTS);
 
-    if (StringUtils.isBlank(cluster) && StringUtils.isBlank(hosts)) {
-      String message = "Missing elastic.cluster and elastic.host. At least one of them should be set in index-writers.xml ";
+    if (StringUtils.isBlank(hosts)) {
+      String message = "Missing elastic.host this should be set in index-writers.xml ";
       message += "\n" + describe();
       LOG.error(message);
       throw new RuntimeException(message);
@@ -125,7 +129,10 @@
 
     LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}",
         maxBulkDocs, maxBulkLength);
-    bulkProcessor = BulkProcessor.builder(client, bulkProcessorListener())
+    bulkProcessor = BulkProcessor.builder(
+        (request, bulkListener) ->
+        client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
+        bulkProcessorListener())
         .setBulkActions(maxBulkDocs)
         .setBulkSize(new ByteSizeValue(maxBulkLength, ByteSizeUnit.BYTES))
         .setConcurrentRequests(1).setBackoffPolicy(BackoffPolicy
@@ -134,50 +141,30 @@
   }
 
   /**
-   * Generates a TransportClient or NodeClient
+   * Generates a RestHighLevelClient with the hosts given
    */
-  protected Client makeClient(IndexWriterParams parameters) throws IOException {
+  protected RestHighLevelClient makeClient(IndexWriterParams parameters) throws IOException {
     hosts = parameters.getStrings(ElasticConstants.HOSTS);
     port = parameters.getInt(ElasticConstants.PORT, DEFAULT_PORT);
 
-    Settings.Builder settingsBuilder = Settings.builder();
+    RestHighLevelClient client = null;
 
-    String options = parameters.get(ElasticConstants.OPTIONS);
-
-    if (options != null) {
-      String[] lines = options.trim().split(",");
-      for (String line : lines) {
-        if (StringUtils.isNotBlank(line)) {
-          String[] parts = line.trim().split("=");
-
-          if (parts.length == 2) {
-            settingsBuilder.put(parts[0].trim(), parts[1].trim());
-          }
-        }
-      }
-    }
-
-    // Set the cluster name and build the settings
-    if (StringUtils.isNotBlank(cluster)) {
-      settingsBuilder.put("cluster.name", cluster);
-    }
-
-    Settings settings = settingsBuilder.build();
-
-    Client client = null;
-
-    // Prefer TransportClient
     if (hosts != null && port > 1) {
-      @SuppressWarnings("resource") TransportClient transportClient = new PreBuiltTransportClient(
-          settings);
-
-      for (String host : hosts)
-        transportClient.addTransportAddress(
-            new InetSocketTransportAddress(InetAddress.getByName(host), port));
-      client = transportClient;
-    } else if (cluster != null) {
-      node = new Node(settings);
-      client = node.client();
+      HttpHost[] hostsList = new HttpHost[hosts.length];
+      int i = 0;
+      for(String host: hosts)	{
+        hostsList[i++] = new HttpHost(host, port);
+      }
+      RestClientBuilder restClientBuilder = RestClient.builder(hostsList);
+      if (StringUtils.isNotBlank(cluster)) {
+        Header[] defaultHeaders = new Header[]{new BasicHeader("cluster.name", cluster)};
+        restClientBuilder.setDefaultHeaders(defaultHeaders);
+      } else	{
+        LOG.debug("No cluster name provided so using default");
+      }
+      client = new RestHighLevelClient(restClientBuilder);
+    } else	{
+      throw new IOException("ElasticRestClient initialization Failed!!!\\n\\nPlease Provide the hosts");
     }
 
     return client;
@@ -215,26 +202,35 @@
     if (type == null)
       type = "doc";
 
-    // Add each field of this doc to the index source
-    Map<String, Object> source = new HashMap<String, Object>();
+    // Add each field of this doc to the index builder
+    XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
     for (final Map.Entry<String, NutchField> e : doc) {
       final List<Object> values = e.getValue().getValues();
 
       if (values.size() > 1) {
-        source.put(e.getKey(), values);
+        builder.array(e.getKey(), values);
       } else {
-        source.put(e.getKey(), values.get(0));
+        Object value = values.get(0);
+        if (value instanceof java.util.Date) {
+          value = DateTimeFormatter.ISO_INSTANT
+              .format(((java.util.Date) value).toInstant());
+        }
+        builder.field(e.getKey(), value);
       }
     }
+    builder.endObject();
 
-    IndexRequest request = new IndexRequest(defaultIndex, type, id)
-        .source(source);
+    IndexRequest request = new IndexRequest(defaultIndex)
+                                           .id(id)
+                                           .source(builder);
+    request.opType(DocWriteRequest.OpType.INDEX);
+
     bulkProcessor.add(request);
   }
 
   @Override
   public void delete(String key) throws IOException {
-    DeleteRequest request = new DeleteRequest(defaultIndex, "doc", key);
+    DeleteRequest request = new DeleteRequest(defaultIndex, key);
     bulkProcessor.add(request);
   }
 
@@ -259,9 +255,6 @@
     }
 
     client.close();
-    if (node != null) {
-      node.close();
-    }
   }
 
   /**
@@ -277,10 +270,9 @@
         "The cluster name to discover. Either host and port must be defined or cluster.",
         this.cluster));
     properties.put(ElasticConstants.HOSTS, new AbstractMap.SimpleEntry<>(
-        "Ordered list of fields (columns) in the CSV fileComma-separated list of "
-            + "hostnames to send documents to using TransportClient. "
+        "Comma-separated list of hostnames to send documents to using TransportClient. "
             + "Either host and port must be defined or cluster.",
-        this.hosts == null ? "" : String.join(",", hosts)));
+            this.hosts == null ? "" : String.join(",", hosts)));
     properties.put(ElasticConstants.PORT, new AbstractMap.SimpleEntry<>(
         "The port to connect to using TransportClient.", this.port));
     properties.put(ElasticConstants.INDEX,
@@ -318,4 +310,4 @@
   public Configuration getConf() {
     return config;
   }
-}
+}
\ No newline at end of file
diff --git a/src/plugin/indexer-elastic/src/test/conf/nutch-site-test.xml b/src/plugin/indexer-elastic/src/test/conf/nutch-site-test.xml
deleted file mode 100644
index 0a37225..0000000
--- a/src/plugin/indexer-elastic/src/test/conf/nutch-site-test.xml
+++ /dev/null
@@ -1,57 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<configuration>
-
-<property>
-  <name>http.agent.name</name>
-  <value>Nutch-Test</value>
-  <description></description>
-</property>
-
-
-<!-- Elasticsearch properties -->
-
-<property>
-  <name>elastic.host</name>
-  <value>localhost</value>
-  <description>The hostname to send documents to using TransportClient. Either host
-  and port must be defined or cluster.</description>
-</property>
-
-<property>
-  <name>elastic.port</name>
-  <value>9300</value>
-  <description>The port to connect to using TransportClient.</description>
-</property>
-
-<property>
-  <name>elastic.cluster</name>
-  <value>nutch</value>
-  <description>The cluster name to discover. Either host and port must be defined
-  or cluster.</description>
-</property>
-
-<property>
-  <name>elastic.index</name>
-  <value>nutch</value>
-  <description>Default index to send documents to.</description>
-</property>
-
-</configuration>
diff --git a/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/TestElasticIndexWriter.java b/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/TestElasticIndexWriter.java
deleted file mode 100644
index ea9552c..0000000
--- a/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/TestElasticIndexWriter.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nutch.indexwriter.elastic;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.nutch.indexer.IndexWriterParams;
-import org.apache.nutch.indexer.NutchDocument;
-import org.apache.nutch.util.NutchConfiguration;
-import org.elasticsearch.action.Action;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.ActionRequestBuilder;
-import org.elasticsearch.action.ActionResponse;
-import org.elasticsearch.action.DocWriteRequest.OpType;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.support.AbstractClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-public class TestElasticIndexWriter {
-
-  boolean bulkRequestSuccessful, clusterSaturated;
-  int curNumFailures, maxNumFailures;
-  Configuration conf;
-  Client client;
-  ElasticIndexWriter testIndexWriter;
-
-  @Before
-  public void setup() {
-    conf = NutchConfiguration.create();
-    conf.addResource("nutch-site-test.xml");
-
-    bulkRequestSuccessful = false;
-    clusterSaturated = false;
-    curNumFailures = 0;
-    maxNumFailures = 0;
-
-    Settings settings = Settings.builder().build();
-    ThreadPool threadPool = new ThreadPool(settings);
-
-    // customize the ES client to simulate responses from an ES cluster
-    client = new AbstractClient(settings, threadPool) {
-      @Override
-      public void close() { }
-
-      @Override
-      protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
-          Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
-
-        BulkResponse response = null;
-        if (clusterSaturated) {
-          // pretend the cluster is saturated
-          curNumFailures++;
-          if (curNumFailures >= maxNumFailures) {
-            // pretend the cluster is suddenly no longer saturated
-            clusterSaturated = false;
-          }
-
-          // respond with a failure
-          BulkItemResponse failed = new BulkItemResponse(0, OpType.INDEX,
-              new BulkItemResponse.Failure("nutch", "index", "failure0",
-                  new EsRejectedExecutionException("saturated")));
-          response = new BulkResponse(new BulkItemResponse[]{failed}, 0);
-        } else {
-          // respond successfully
-          BulkItemResponse success = new BulkItemResponse(0, OpType.INDEX,
-              new IndexResponse(new ShardId("nutch", UUID.randomUUID().toString(), 0), "index", "index0", 0, true));
-          response = new BulkResponse(new BulkItemResponse[]{success}, 0);
-        }
-
-        listener.onResponse((Response)response);
-      }
-    };
-
-    // customize the plugin to signal successful bulk operations
-    testIndexWriter = new ElasticIndexWriter() {
-      @Override
-      protected Client makeClient(IndexWriterParams parameters) {
-        return client;
-      }
-
-      @Override
-      protected BulkProcessor.Listener bulkProcessorListener() {
-        return new BulkProcessor.Listener() {
-
-          @Override
-          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
-            if (!response.hasFailures()) {
-              bulkRequestSuccessful = true;
-            }
-          }
-
-          @Override
-          public void afterBulk(long executionId, BulkRequest request, Throwable failure) { }
-
-          @Override
-          public void beforeBulk(long executionId, BulkRequest request) { }
-        };
-      }
-    };
-  }
-
-  @Test
-  public void testBulkMaxDocs() throws IOException {
-    int numDocs = 10;
-    conf.setInt(ElasticConstants.MAX_BULK_DOCS, numDocs);
-    @SuppressWarnings("unused")
-    Job job = Job.getInstance(conf);
-
-    Map<String, String> parameters = new HashMap<>();
-    parameters.put(ElasticConstants.CLUSTER, "nutch");
-    parameters.put(ElasticConstants.MAX_BULK_DOCS, String.valueOf(numDocs));
-
-    testIndexWriter.setConf(conf);
-    testIndexWriter.open(new IndexWriterParams(parameters));
-
-    NutchDocument doc = new NutchDocument();
-    doc.add("id", "http://www.example.com");
-
-    Assert.assertFalse(bulkRequestSuccessful);
-
-    for (int i = 0; i < numDocs; i++) {
-      testIndexWriter.write(doc);
-    }
-
-    testIndexWriter.close();
-
-    Assert.assertTrue(bulkRequestSuccessful);
-  }
-
-  @Test
-  public void testBulkMaxLength() throws IOException {
-    String key = "id";
-    String value = "http://www.example.com";
-
-    int defaultMaxBulkLength = conf.getInt(ElasticConstants.MAX_BULK_LENGTH, 2500500);
-
-    // Test that MAX_BULK_LENGTH is respected by lowering it 10x
-    int testMaxBulkLength = defaultMaxBulkLength / 10;
-
-    // This number is somewhat arbitrary, but must be a function of:
-    // - testMaxBulkLength
-    // - approximate size of each doc
-    int numDocs = testMaxBulkLength / (key.length() + value.length());
-
-    conf.setInt(ElasticConstants.MAX_BULK_LENGTH, testMaxBulkLength);
-    @SuppressWarnings("unused")
-    Job job = Job.getInstance(conf);
-
-    Map<String, String> parameters = new HashMap<>();
-    parameters.put(ElasticConstants.CLUSTER, "nutch");
-    parameters.put(ElasticConstants.MAX_BULK_LENGTH, String.valueOf(testMaxBulkLength));
-
-    testIndexWriter.setConf(conf);
-    testIndexWriter.open(new IndexWriterParams(parameters));
-
-    NutchDocument doc = new NutchDocument();
-    doc.add(key, value);
-
-    Assert.assertFalse(bulkRequestSuccessful);
-
-    for (int i = 0; i < numDocs; i++) {
-      testIndexWriter.write(doc);
-    }
-
-    testIndexWriter.close();
-
-    Assert.assertTrue(bulkRequestSuccessful);
-  }
-
-  @Test
-  public void testBackoffPolicy() throws IOException {
-    // set a non-zero "max-retry" value, **implying the cluster is saturated**
-    maxNumFailures = 5;
-    conf.setInt(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES, maxNumFailures);
-
-    int numDocs = 10;
-    conf.setInt(ElasticConstants.MAX_BULK_DOCS, numDocs);
-
-    @SuppressWarnings("unused")
-    Job job = Job.getInstance(conf);
-
-    Map<String, String> parameters = new HashMap<>();
-    parameters.put(ElasticConstants.CLUSTER, "nutch");
-    parameters.put(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES, String.valueOf(maxNumFailures));
-    parameters.put(ElasticConstants.MAX_BULK_DOCS, String.valueOf(numDocs));
-
-    testIndexWriter.setConf(conf);
-    testIndexWriter.open(new IndexWriterParams(parameters));
-
-    NutchDocument doc = new NutchDocument();
-    doc.add("id", "http://www.example.com");
-
-    // pretend the remote cluster is "saturated"
-    clusterSaturated = true;
-
-    Assert.assertFalse(bulkRequestSuccessful);
-
-    // write enough docs to initiate one bulk request
-    for (int i = 0; i < numDocs; i++) {
-      testIndexWriter.write(doc);
-    }
-
-    testIndexWriter.close();
-
-    // the BulkProcessor should have retried `maxNumFailures + 1` times, then succeeded
-    Assert.assertTrue(bulkRequestSuccessful);
-  }
-
-}
