Merge pull request #484 from balashashanka/NUTCH-2739

NUTCH-2739 indexer-elastic: Upgrade ES and migrate to REST client
- upgrade to Elasticsearch 7.3.0
- use Java High Level REST Client instead of deprecated TransportClient
diff --git a/build.xml b/build.xml
index f50395e..ae0f111 100644
--- a/build.xml
+++ b/build.xml
@@ -1102,7 +1102,6 @@
         <source path="${plugins.dir}/indexer-dummy/src/java/" />
         <source path="${plugins.dir}/indexer-elastic-rest/src/java/"/>
         <source path="${plugins.dir}/indexer-elastic/src/java/" />
-        <source path="${plugins.dir}/indexer-elastic/src/test/" />
         <source path="${plugins.dir}/indexer-kafka/src/java/" />
         <source path="${plugins.dir}/indexer-rabbit/src/java/" />
         <source path="${plugins.dir}/indexer-solr/src/java/" />
diff --git a/conf/index-writers.xml.template b/conf/index-writers.xml.template
index 808e31f..96c765e 100644
--- a/conf/index-writers.xml.template
+++ b/conf/index-writers.xml.template
@@ -106,8 +106,8 @@
   <writer id="indexer_elastic_1" class="org.apache.nutch.indexwriter.elastic.ElasticIndexWriter">
-      <param name="host" value=""/>
-      <param name="port" value="9300"/>
+      <param name="host" value="localhost"/>
+      <param name="port" value="9200"/>
       <param name="cluster" value=""/>
       <param name="index" value="nutch"/>
       <param name="" value="250"/>
diff --git a/src/plugin/build.xml b/src/plugin/build.xml
index 51c3fe7..b0882a5 100755
--- a/src/plugin/build.xml
+++ b/src/plugin/build.xml
@@ -125,7 +125,6 @@
      <ant dir="index-replace" target="test"/>
      <ant dir="index-static" target="test"/>
      <ant dir="indexer-csv" target="test"/>
-     <ant dir="indexer-elastic" target="test"/>
      <ant dir="language-identifier" target="test"/>
      <ant dir="lib-http" target="test"/>
      <ant dir="lib-regex-filter" target="test"/>
diff --git a/src/plugin/indexer-elastic/build.xml b/src/plugin/indexer-elastic/build.xml
index 6955f61..4167d09 100644
--- a/src/plugin/indexer-elastic/build.xml
+++ b/src/plugin/indexer-elastic/build.xml
@@ -26,9 +26,6 @@
   <!-- Deploy Unit test dependencies -->
   <target name="deps-test">
-    <copy toDir="${build.test}">
-      <fileset dir="${src.test}" excludes="**/*.java"/>
-    </copy>
diff --git a/src/plugin/indexer-elastic/howto_upgrade_es.txt b/src/plugin/indexer-elastic/howto_upgrade_es.txt
index b577053..a815644 100644
--- a/src/plugin/indexer-elastic/howto_upgrade_es.txt
+++ b/src/plugin/indexer-elastic/howto_upgrade_es.txt
@@ -1,6 +1,33 @@
-1. Upgrade elasticsearch dependency in src/plugin/indexer-elastic/ivy.xml
+1. Upgrade Elasticsearch dependency in src/plugin/indexer-elastic/ivy.xml
 2. Upgrade the Elasticsearch specific dependencies in src/plugin/indexer-elastic/plugin.xml
    To get the list of dependencies and their versions execute:
-   $ ant -f ./build-ivy.xml
-   $ ls lib/
+    $ cd src/plugin/indexer-elastic/
+    $ ant -f ./build-ivy.xml
+    $ ls lib | sed 's/^/    <library name="/g' | sed 's/$/"\/>/g'
+   In the plugin.xml replace all lines between
+      <!-- Elastic Rest Client dependencies -->
+   and
+      <!-- end of Elastic Rest Client dependencies -->
+   with the output of the command above.
+4. (Optionally) remove overlapping dependencies between indexer-elastic and Nutch core dependencies:
+   - check for libs present both in
+       build/lib
+     and
+       build/plugins/indexer-elastic/
+     (eventually with different versions)
+   - duplicated libs can be added to the exclusions of transitive dependencies in
+       build/plugins/indexer-elastic/ivy.xml
+   - but it should be made sure that the library versions in ivy/ivy.xml correspend to
+     those required by Tika
+5. Remove the locally "installed" dependencies in src/plugin/indexer-elastic/lib/:
+    $ rm -rf lib/
+6. Build Nutch and run all unit tests:
+    $ cd ../../../
+    $ ant clean runtime test
\ No newline at end of file
diff --git a/src/plugin/indexer-elastic/ivy.xml b/src/plugin/indexer-elastic/ivy.xml
index 48ea594..2b42629 100644
--- a/src/plugin/indexer-elastic/ivy.xml
+++ b/src/plugin/indexer-elastic/ivy.xml
@@ -27,7 +27,7 @@
-    <include file="../../..//ivy/ivy-configurations.xml"/>
+    <include file="../../../ivy/ivy-configurations.xml"/>
@@ -36,10 +36,12 @@
-    <dependency org="org.elasticsearch" name="elasticsearch" rev="5.3.0" conf="*->default"/>
-    <dependency org="org.elasticsearch.client" name="transport" rev="5.3.0"/>
-    <dependency org="org.apache.logging.log4j" name="log4j-api" rev="2.7" />
-    <dependency org="org.apache.logging.log4j" name="log4j-core" rev="2.7" />
+    <dependency org="org.elasticsearch.client" name="elasticsearch-rest-high-level-client" rev="7.3.0">
+      <!-- exclusions of dependencies provided in Nutch core (ivy/ivy.xml) -->
+      <exclude org="commons-codec" name="commons-codec" />
+      <exclude org="commons-logging" name="commons-logging" />
+      <exclude org="com.tdunning" name="t-digest" />
+    </dependency>
diff --git a/src/plugin/indexer-elastic/plugin.xml b/src/plugin/indexer-elastic/plugin.xml
index 2cbb53a..45ac61e 100644
--- a/src/plugin/indexer-elastic/plugin.xml
+++ b/src/plugin/indexer-elastic/plugin.xml
@@ -6,9 +6,7 @@
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,63 +21,52 @@
       <export name="*" />
-    <!-- DEPENDENCIES FOR: -->
-    <library name="rest-5.3.0.jar"/>
-    <library name="commons-codec-1.10.jar"/>
-    <library name="commons-logging-1.1.3.jar"/>
-    <library name="httpasyncclient-4.1.2.jar"/>
-    <library name="httpclient-4.5.2.jar"/>
-    <library name="httpcore-4.4.5.jar"/>
-    <library name="httpcore-nio-4.4.5.jar"/>
-    <!-- ElasticSearch DEPENDENCIES-->
-    <library name="elasticsearch-5.3.0.jar"/>
+    <!-- Elastic Rest Client Dependencies -->
+    <library name="aggs-matrix-stats-client-7.3.0.jar"/>
     <library name="compiler-0.9.3.jar"/>
-    <library name="HdrHistogram-2.1.6.jar"/>
-    <library name="hppc-0.7.1.jar"/>
-    <library name="jackson-core-2.8.6.jar"/>
-    <library name="jackson-dataformat-cbor-2.8.6.jar"/>
-    <library name="jackson-dataformat-smile-2.8.6.jar"/>
-    <library name="jackson-dataformat-yaml-2.8.6.jar"/>
-    <library name="jna-4.2.2.jar"/>
-    <library name="joda-time-2.9.5.jar"/>
+    <library name="elasticsearch-7.3.0.jar"/>
+    <library name="elasticsearch-cli-7.3.0.jar"/>
+    <library name="elasticsearch-core-7.3.0.jar"/>
+    <library name="elasticsearch-geo-7.3.0.jar"/>
+    <library name="elasticsearch-rest-client-7.3.0.jar"/>
+    <library name="elasticsearch-rest-high-level-client-7.3.0.jar"/>
+    <library name="elasticsearch-secure-sm-7.3.0.jar"/>
+    <library name="elasticsearch-x-content-7.3.0.jar"/>
+    <library name="HdrHistogram-2.1.9.jar"/>
+    <library name="hppc-0.8.1.jar"/>
+    <library name="httpasyncclient-4.1.4.jar"/>
+    <library name="httpclient-4.5.8.jar"/>
+    <library name="httpcore-4.4.11.jar"/>
+    <library name="httpcore-nio-4.4.11.jar"/>
+    <library name="jackson-core-2.8.11.jar"/>
+    <library name="jackson-dataformat-cbor-2.8.11.jar"/>
+    <library name="jackson-dataformat-smile-2.8.11.jar"/>
+    <library name="jackson-dataformat-yaml-2.8.11.jar"/>
+    <library name="jna-4.5.1.jar"/>
+    <library name="joda-time-2.10.2.jar"/>
     <library name="jopt-simple-5.0.2.jar"/>
-    <library name="log4j-api-2.7.jar"/>
-    <library name="log4j-core-2.7.jar"/>
-    <library name="lucene-analyzers-common-6.4.1.jar"/>
-    <library name="lucene-backward-codecs-6.4.1.jar"/>
-    <library name="lucene-core-6.4.1.jar"/>
-    <library name="lucene-grouping-6.4.1.jar"/>
-    <library name="lucene-highlighter-6.4.1.jar"/>
-    <library name="lucene-join-6.4.1.jar"/>
-    <library name="lucene-memory-6.4.1.jar"/>
-    <library name="lucene-misc-6.4.1.jar"/>
-    <library name="lucene-queries-6.4.1.jar"/>
-    <library name="lucene-queryparser-6.4.1.jar"/>
-    <library name="lucene-sandbox-6.4.1.jar"/>
-    <library name="lucene-spatial3d-6.4.1.jar"/>
-    <library name="lucene-spatial-6.4.1.jar"/>
-    <library name="lucene-spatial-extras-6.4.1.jar"/>
-    <library name="lucene-suggest-6.4.1.jar"/>
-    <library name="securesm-1.1.jar"/>
-    <library name="snakeyaml-1.15.jar"/>
-    <library name="t-digest-3.0.jar"/>
+    <library name="lang-mustache-client-7.3.0.jar"/>
+    <library name="log4j-api-2.11.1.jar"/>
+    <library name="lucene-analyzers-common-8.1.0.jar"/>
+    <library name="lucene-backward-codecs-8.1.0.jar"/>
+    <library name="lucene-core-8.1.0.jar"/>
+    <library name="lucene-grouping-8.1.0.jar"/>
+    <library name="lucene-highlighter-8.1.0.jar"/>
+    <library name="lucene-join-8.1.0.jar"/>
+    <library name="lucene-memory-8.1.0.jar"/>
+    <library name="lucene-misc-8.1.0.jar"/>
+    <library name="lucene-queries-8.1.0.jar"/>
+    <library name="lucene-queryparser-8.1.0.jar"/>
+    <library name="lucene-sandbox-8.1.0.jar"/>
+    <library name="lucene-spatial3d-8.1.0.jar"/>
+    <library name="lucene-spatial-8.1.0.jar"/>
+    <library name="lucene-spatial-extras-8.1.0.jar"/>
+    <library name="lucene-suggest-8.1.0.jar"/>
+    <library name="parent-join-client-7.3.0.jar"/>
+    <library name="rank-eval-client-7.3.0.jar"/>
+    <library name="snakeyaml-1.17.jar"/>
+    <!-- end of Elastic Rest Client dependencies -->
-    <!-- ElasticSearch Transport jar Deps-->
-    <library name="percolator-client-5.3.0.jar"/>
-    <library name="reindex-client-5.3.0.jar"/>
-    <library name="lang-mustache-client-5.3.0.jar"/>
-    <library name="transport-5.3.0.jar"/>
-    <library name="transport-netty3-client-5.3.0.jar"/>
-    <library name="transport-netty4-client-5.3.0.jar"/>
-    <library name="netty-all-4.1.7.Final.jar"/>
-    <library name="netty-3.10.6.Final.jar"/>
-    <library name="netty-buffer-4.1.7.Final.jar"/>
-    <library name="netty-codec-4.1.7.Final.jar"/>
-    <library name="netty-codec-http-4.1.7.Final.jar"/>
-    <library name="netty-common-4.1.7.Final.jar"/>
-    <library name="netty-handler-4.1.7.Final.jar"/>
-    <library name="netty-resolver-4.1.7.Final.jar"/>
-    <library name="netty-transport-4.1.7.Final.jar"/>
@@ -93,4 +80,4 @@
       class="org.apache.nutch.indexwriter.elastic.ElasticIndexWriter" />
\ No newline at end of file
diff --git a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/
index ee31527..74727a0 100644
--- a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/
+++ b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/
@@ -17,10 +17,9 @@
 package org.apache.nutch.indexwriter.elastic;
 import java.lang.invoke.MethodHandles;
+import java.time.format.DateTimeFormatter;
 import java.util.AbstractMap;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,25 +27,30 @@
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.http.Header;
+import org.apache.http.HttpHost;
+import org.apache.http.message.BasicHeader;
 import org.apache.nutch.indexer.IndexWriter;
 import org.apache.nutch.indexer.IndexWriterParams;
 import org.apache.nutch.indexer.NutchDocument;
 import org.apache.nutch.indexer.NutchField;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.client.RequestOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,6 +61,7 @@
   private static final Logger LOG = LoggerFactory
   private static final int DEFAULT_PORT = 9300;
   private static final int DEFAULT_MAX_BULK_DOCS = 250;
   private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
@@ -75,8 +80,7 @@
   private int expBackoffRetries;
   private String defaultIndex;
-  private Client client;
-  private Node node;
+  private RestHighLevelClient client;
   private BulkProcessor bulkProcessor;
   private long bulkCloseTimeout;
@@ -99,8 +103,8 @@
     cluster = parameters.get(ElasticConstants.CLUSTER);
     String hosts = parameters.get(ElasticConstants.HOSTS);
-    if (StringUtils.isBlank(cluster) && StringUtils.isBlank(hosts)) {
-      String message = "Missing elastic.cluster and At least one of them should be set in index-writers.xml ";
+    if (StringUtils.isBlank(hosts)) {
+      String message = "Missing this should be set in index-writers.xml ";
       message += "\n" + describe();
       throw new RuntimeException(message);
@@ -125,7 +129,10 @@
     LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}",
         maxBulkDocs, maxBulkLength);
-    bulkProcessor = BulkProcessor.builder(client, bulkProcessorListener())
+    bulkProcessor = BulkProcessor.builder(
+        (request, bulkListener) ->
+        client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
+        bulkProcessorListener())
         .setBulkSize(new ByteSizeValue(maxBulkLength, ByteSizeUnit.BYTES))
@@ -134,50 +141,30 @@
-   * Generates a TransportClient or NodeClient
+   * Generates a RestHighLevelClient with the hosts given
-  protected Client makeClient(IndexWriterParams parameters) throws IOException {
+  protected RestHighLevelClient makeClient(IndexWriterParams parameters) throws IOException {
     hosts = parameters.getStrings(ElasticConstants.HOSTS);
     port = parameters.getInt(ElasticConstants.PORT, DEFAULT_PORT);
-    Settings.Builder settingsBuilder = Settings.builder();
+    RestHighLevelClient client = null;
-    String options = parameters.get(ElasticConstants.OPTIONS);
-    if (options != null) {
-      String[] lines = options.trim().split(",");
-      for (String line : lines) {
-        if (StringUtils.isNotBlank(line)) {
-          String[] parts = line.trim().split("=");
-          if (parts.length == 2) {
-            settingsBuilder.put(parts[0].trim(), parts[1].trim());
-          }
-        }
-      }
-    }
-    // Set the cluster name and build the settings
-    if (StringUtils.isNotBlank(cluster)) {
-      settingsBuilder.put("", cluster);
-    }
-    Settings settings =;
-    Client client = null;
-    // Prefer TransportClient
     if (hosts != null && port > 1) {
-      @SuppressWarnings("resource") TransportClient transportClient = new PreBuiltTransportClient(
-          settings);
-      for (String host : hosts)
-        transportClient.addTransportAddress(
-            new InetSocketTransportAddress(InetAddress.getByName(host), port));
-      client = transportClient;
-    } else if (cluster != null) {
-      node = new Node(settings);
-      client = node.client();
+      HttpHost[] hostsList = new HttpHost[hosts.length];
+      int i = 0;
+      for(String host: hosts)	{
+        hostsList[i++] = new HttpHost(host, port);
+      }
+      RestClientBuilder restClientBuilder = RestClient.builder(hostsList);
+      if (StringUtils.isNotBlank(cluster)) {
+        Header[] defaultHeaders = new Header[]{new BasicHeader("", cluster)};
+        restClientBuilder.setDefaultHeaders(defaultHeaders);
+      } else	{
+        LOG.debug("No cluster name provided so using default");
+      }
+      client = new RestHighLevelClient(restClientBuilder);
+    } else	{
+      throw new IOException("ElasticRestClient initialization Failed!!!\\n\\nPlease Provide the hosts");
     return client;
@@ -215,26 +202,35 @@
     if (type == null)
       type = "doc";
-    // Add each field of this doc to the index source
-    Map<String, Object> source = new HashMap<String, Object>();
+    // Add each field of this doc to the index builder
+    XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
     for (final Map.Entry<String, NutchField> e : doc) {
       final List<Object> values = e.getValue().getValues();
       if (values.size() > 1) {
-        source.put(e.getKey(), values);
+        builder.array(e.getKey(), values);
       } else {
-        source.put(e.getKey(), values.get(0));
+        Object value = values.get(0);
+        if (value instanceof java.util.Date) {
+          value = DateTimeFormatter.ISO_INSTANT
+              .format(((java.util.Date) value).toInstant());
+        }
+        builder.field(e.getKey(), value);
+    builder.endObject();
-    IndexRequest request = new IndexRequest(defaultIndex, type, id)
-        .source(source);
+    IndexRequest request = new IndexRequest(defaultIndex)
+                                           .id(id)
+                                           .source(builder);
+    request.opType(DocWriteRequest.OpType.INDEX);
   public void delete(String key) throws IOException {
-    DeleteRequest request = new DeleteRequest(defaultIndex, "doc", key);
+    DeleteRequest request = new DeleteRequest(defaultIndex, key);
@@ -259,9 +255,6 @@
-    if (node != null) {
-      node.close();
-    }
@@ -277,10 +270,9 @@
         "The cluster name to discover. Either host and port must be defined or cluster.",
     properties.put(ElasticConstants.HOSTS, new AbstractMap.SimpleEntry<>(
-        "Ordered list of fields (columns) in the CSV fileComma-separated list of "
-            + "hostnames to send documents to using TransportClient. "
+        "Comma-separated list of hostnames to send documents to using TransportClient. "
             + "Either host and port must be defined or cluster.",
-        this.hosts == null ? "" : String.join(",", hosts)));
+            this.hosts == null ? "" : String.join(",", hosts)));
     properties.put(ElasticConstants.PORT, new AbstractMap.SimpleEntry<>(
         "The port to connect to using TransportClient.", this.port));
@@ -318,4 +310,4 @@
   public Configuration getConf() {
     return config;
\ No newline at end of file
diff --git a/src/plugin/indexer-elastic/src/test/conf/nutch-site-test.xml b/src/plugin/indexer-elastic/src/test/conf/nutch-site-test.xml
deleted file mode 100644
index 0a37225..0000000
--- a/src/plugin/indexer-elastic/src/test/conf/nutch-site-test.xml
+++ /dev/null
@@ -1,57 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- See the License for the specific language governing permissions and
- limitations under the License.
-  <name></name>
-  <value>Nutch-Test</value>
-  <description></description>
-<!-- Elasticsearch properties -->
-  <name></name>
-  <value>localhost</value>
-  <description>The hostname to send documents to using TransportClient. Either host
-  and port must be defined or cluster.</description>
-  <name>elastic.port</name>
-  <value>9300</value>
-  <description>The port to connect to using TransportClient.</description>
-  <name>elastic.cluster</name>
-  <value>nutch</value>
-  <description>The cluster name to discover. Either host and port must be defined
-  or cluster.</description>
-  <name>elastic.index</name>
-  <value>nutch</value>
-  <description>Default index to send documents to.</description>
diff --git a/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/ b/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/
deleted file mode 100644
index ea9552c..0000000
--- a/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/
+++ /dev/null
@@ -1,241 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nutch.indexwriter.elastic;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.nutch.indexer.IndexWriterParams;
-import org.apache.nutch.indexer.NutchDocument;
-import org.apache.nutch.util.NutchConfiguration;
-import org.elasticsearch.action.Action;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.ActionRequestBuilder;
-import org.elasticsearch.action.ActionResponse;
-import org.elasticsearch.action.DocWriteRequest.OpType;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-public class TestElasticIndexWriter {
-  boolean bulkRequestSuccessful, clusterSaturated;
-  int curNumFailures, maxNumFailures;
-  Configuration conf;
-  Client client;
-  ElasticIndexWriter testIndexWriter;
-  @Before
-  public void setup() {
-    conf = NutchConfiguration.create();
-    conf.addResource("nutch-site-test.xml");
-    bulkRequestSuccessful = false;
-    clusterSaturated = false;
-    curNumFailures = 0;
-    maxNumFailures = 0;
-    Settings settings = Settings.builder().build();
-    ThreadPool threadPool = new ThreadPool(settings);
-    // customize the ES client to simulate responses from an ES cluster
-    client = new AbstractClient(settings, threadPool) {
-      @Override
-      public void close() { }
-      @Override
-      protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
-          Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
-        BulkResponse response = null;
-        if (clusterSaturated) {
-          // pretend the cluster is saturated
-          curNumFailures++;
-          if (curNumFailures >= maxNumFailures) {
-            // pretend the cluster is suddenly no longer saturated
-            clusterSaturated = false;
-          }
-          // respond with a failure
-          BulkItemResponse failed = new BulkItemResponse(0, OpType.INDEX,
-              new BulkItemResponse.Failure("nutch", "index", "failure0",
-                  new EsRejectedExecutionException("saturated")));
-          response = new BulkResponse(new BulkItemResponse[]{failed}, 0);
-        } else {
-          // respond successfully
-          BulkItemResponse success = new BulkItemResponse(0, OpType.INDEX,
-              new IndexResponse(new ShardId("nutch", UUID.randomUUID().toString(), 0), "index", "index0", 0, true));
-          response = new BulkResponse(new BulkItemResponse[]{success}, 0);
-        }
-        listener.onResponse((Response)response);
-      }
-    };
-    // customize the plugin to signal successful bulk operations
-    testIndexWriter = new ElasticIndexWriter() {
-      @Override
-      protected Client makeClient(IndexWriterParams parameters) {
-        return client;
-      }
-      @Override
-      protected BulkProcessor.Listener bulkProcessorListener() {
-        return new BulkProcessor.Listener() {
-          @Override
-          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
-            if (!response.hasFailures()) {
-              bulkRequestSuccessful = true;
-            }
-          }
-          @Override
-          public void afterBulk(long executionId, BulkRequest request, Throwable failure) { }
-          @Override
-          public void beforeBulk(long executionId, BulkRequest request) { }
-        };
-      }
-    };
-  }
-  @Test
-  public void testBulkMaxDocs() throws IOException {
-    int numDocs = 10;
-    conf.setInt(ElasticConstants.MAX_BULK_DOCS, numDocs);
-    @SuppressWarnings("unused")
-    Job job = Job.getInstance(conf);
-    Map<String, String> parameters = new HashMap<>();
-    parameters.put(ElasticConstants.CLUSTER, "nutch");
-    parameters.put(ElasticConstants.MAX_BULK_DOCS, String.valueOf(numDocs));
-    testIndexWriter.setConf(conf);
- IndexWriterParams(parameters));
-    NutchDocument doc = new NutchDocument();
-    doc.add("id", "");
-    Assert.assertFalse(bulkRequestSuccessful);
-    for (int i = 0; i < numDocs; i++) {
-      testIndexWriter.write(doc);
-    }
-    testIndexWriter.close();
-    Assert.assertTrue(bulkRequestSuccessful);
-  }
-  @Test
-  public void testBulkMaxLength() throws IOException {
-    String key = "id";
-    String value = "";
-    int defaultMaxBulkLength = conf.getInt(ElasticConstants.MAX_BULK_LENGTH, 2500500);
-    // Test that MAX_BULK_LENGTH is respected by lowering it 10x
-    int testMaxBulkLength = defaultMaxBulkLength / 10;
-    // This number is somewhat arbitrary, but must be a function of:
-    // - testMaxBulkLength
-    // - approximate size of each doc
-    int numDocs = testMaxBulkLength / (key.length() + value.length());
-    conf.setInt(ElasticConstants.MAX_BULK_LENGTH, testMaxBulkLength);
-    @SuppressWarnings("unused")
-    Job job = Job.getInstance(conf);
-    Map<String, String> parameters = new HashMap<>();
-    parameters.put(ElasticConstants.CLUSTER, "nutch");
-    parameters.put(ElasticConstants.MAX_BULK_LENGTH, String.valueOf(testMaxBulkLength));
-    testIndexWriter.setConf(conf);
- IndexWriterParams(parameters));
-    NutchDocument doc = new NutchDocument();
-    doc.add(key, value);
-    Assert.assertFalse(bulkRequestSuccessful);
-    for (int i = 0; i < numDocs; i++) {
-      testIndexWriter.write(doc);
-    }
-    testIndexWriter.close();
-    Assert.assertTrue(bulkRequestSuccessful);
-  }
-  @Test
-  public void testBackoffPolicy() throws IOException {
-    // set a non-zero "max-retry" value, **implying the cluster is saturated**
-    maxNumFailures = 5;
-    conf.setInt(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES, maxNumFailures);
-    int numDocs = 10;
-    conf.setInt(ElasticConstants.MAX_BULK_DOCS, numDocs);
-    @SuppressWarnings("unused")
-    Job job = Job.getInstance(conf);
-    Map<String, String> parameters = new HashMap<>();
-    parameters.put(ElasticConstants.CLUSTER, "nutch");
-    parameters.put(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES, String.valueOf(maxNumFailures));
-    parameters.put(ElasticConstants.MAX_BULK_DOCS, String.valueOf(numDocs));
-    testIndexWriter.setConf(conf);
- IndexWriterParams(parameters));
-    NutchDocument doc = new NutchDocument();
-    doc.add("id", "");
-    // pretend the remote cluster is "saturated"
-    clusterSaturated = true;
-    Assert.assertFalse(bulkRequestSuccessful);
-    // write enough docs to initiate one bulk request
-    for (int i = 0; i < numDocs; i++) {
-      testIndexWriter.write(doc);
-    }
-    testIndexWriter.close();
-    // the BulkProcessor should have retried `maxNumFailures + 1` times, then succeeded
-    Assert.assertTrue(bulkRequestSuccessful);
-  }