NUTCH-2757 - Indexer-elastic: add authentication options
diff --git a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
index a4c13d3..5f772ba 100644
--- a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
+++ b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
@@ -77,7 +77,6 @@
   private static final Logger LOG = LoggerFactory
       .getLogger(MethodHandles.lookup().lookupClass());
 
-
   private static final int DEFAULT_PORT = 9300;
   private static final int DEFAULT_MAX_BULK_DOCS = 250;
   private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
@@ -110,14 +109,16 @@
 
   @Override
   public void open(Configuration conf, String name) throws IOException {
-    //Implementation not required
+    // Implementation not required
   }
 
   /**
    * Initializes the internal variables from a given index writer configuration.
    *
-   * @param parameters Params from the index writer configuration.
-   * @throws IOException Some exception thrown by writer.
+   * @param parameters
+   *          Params from the index writer configuration.
+   * @throws IOException
+   *           Some exception thrown by writer.
    */
   @Override
   public void open(IndexWriterParams parameters) throws IOException {
@@ -135,44 +136,44 @@
         DEFAULT_BULK_CLOSE_TIMEOUT);
     defaultIndex = parameters.get(ElasticConstants.INDEX, DEFAULT_INDEX);
 
-    maxBulkDocs = parameters
-        .getInt(ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
-    maxBulkLength = parameters
-        .getInt(ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
-    expBackoffMillis = parameters
-        .getInt(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS,
-            DEFAULT_EXP_BACKOFF_MILLIS);
-    expBackoffRetries = parameters
-        .getInt(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES,
-            DEFAULT_EXP_BACKOFF_RETRIES);
+    maxBulkDocs = parameters.getInt(ElasticConstants.MAX_BULK_DOCS,
+        DEFAULT_MAX_BULK_DOCS);
+    maxBulkLength = parameters.getInt(ElasticConstants.MAX_BULK_LENGTH,
+        DEFAULT_MAX_BULK_LENGTH);
+    expBackoffMillis = parameters.getInt(
+        ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS,
+        DEFAULT_EXP_BACKOFF_MILLIS);
+    expBackoffRetries = parameters.getInt(
+        ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES,
+        DEFAULT_EXP_BACKOFF_RETRIES);
 
     client = makeClient(parameters);
 
     LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}",
         maxBulkDocs, maxBulkLength);
-    bulkProcessor = BulkProcessor.builder(
-        (request, bulkListener) ->
-        client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
-        bulkProcessorListener())
+    bulkProcessor = BulkProcessor
+        .builder((request, bulkListener) -> client.bulkAsync(request,
+            RequestOptions.DEFAULT, bulkListener), bulkProcessorListener())
         .setBulkActions(maxBulkDocs)
         .setBulkSize(new ByteSizeValue(maxBulkLength, ByteSizeUnit.BYTES))
-        .setConcurrentRequests(1).setBackoffPolicy(BackoffPolicy
-            .exponentialBackoff(TimeValue.timeValueMillis(expBackoffMillis),
-                expBackoffRetries)).build();
+        .setConcurrentRequests(1)
+        .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
+            TimeValue.timeValueMillis(expBackoffMillis), expBackoffRetries))
+        .build();
   }
 
   /**
    * Generates a RestHighLevelClient with the hosts given
    */
-  protected RestHighLevelClient makeClient(IndexWriterParams parameters) throws IOException {
+  protected RestHighLevelClient makeClient(IndexWriterParams parameters)
+      throws IOException {
     hosts = parameters.getStrings(ElasticConstants.HOSTS);
     port = parameters.getInt(ElasticConstants.PORT, DEFAULT_PORT);
-    
+
     user = parameters.get(ElasticConstants.USER, DEFAULT_USER);
     password = parameters.get(ElasticConstants.PASSWORD, "");
-    
-    final CredentialsProvider credentialsProvider =
-        new BasicCredentialsProvider();
+
+    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
     credentialsProvider.setCredentials(AuthScope.ANY,
         new UsernamePasswordCredentials(user, password));
 
@@ -181,30 +182,32 @@
     if (hosts != null && port > 1) {
       HttpHost[] hostsList = new HttpHost[hosts.length];
       int i = 0;
-      for(String host: hosts)	{
+      for (String host : hosts) {
         hostsList[i++] = new HttpHost(host, port);
       }
       RestClientBuilder restClientBuilder = RestClient.builder(hostsList);
       if (StringUtils.isNotBlank(cluster)) {
-        Header[] defaultHeaders = new Header[]{new BasicHeader("cluster.name", cluster)};
+        Header[] defaultHeaders = new Header[] {
+            new BasicHeader("cluster.name", cluster) };
         restClientBuilder.setDefaultHeaders(defaultHeaders);
-      } else	{
+      } else {
         LOG.debug("No cluster name provided so using default");
       }
-      //Only add username and password if password is configured
-      if(StringUtils.isNotBlank(password)) {
-        restClientBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback()  {
-          @Override
-          public HttpAsyncClientBuilder customizeHttpClient(
-              HttpAsyncClientBuilder arg0) {
-            return arg0
-                .setDefaultCredentialsProvider(credentialsProvider);
-          }    
-        }); 
+      // Only add username and password if password is configured
+      if (StringUtils.isNotBlank(password)) {
+        restClientBuilder
+            .setHttpClientConfigCallback(new HttpClientConfigCallback() {
+              @Override
+              public HttpAsyncClientBuilder customizeHttpClient(
+                  HttpAsyncClientBuilder arg0) {
+                return arg0.setDefaultCredentialsProvider(credentialsProvider);
+              }
+            });
       }
       client = new RestHighLevelClient(restClientBuilder);
-    } else	{
-      throw new IOException("ElasticRestClient initialization Failed!!!\\n\\nPlease Provide the hosts");
+    } else {
+      throw new IOException(
+          "ElasticRestClient initialization Failed!!!\\n\\nPlease Provide the hosts");
     }
 
     return client;
@@ -260,9 +263,8 @@
     }
     builder.endObject();
 
-    IndexRequest request = new IndexRequest(defaultIndex)
-                                           .id(id)
-                                           .source(builder);
+    IndexRequest request = new IndexRequest(defaultIndex).id(id)
+        .source(builder);
     request.opType(DocWriteRequest.OpType.INDEX);
 
     bulkProcessor.add(request);
@@ -298,9 +300,11 @@
   }
 
   /**
-   * Returns {@link Map} with the specific parameters the IndexWriter instance can take.
+   * Returns {@link Map} with the specific parameters the IndexWriter instance
+   * can take.
    *
-   * @return The values of each row. It must have the form <KEY,<DESCRIPTION,VALUE>>.
+   * @return The values of each row. It must have the form
+   *         <KEY,<DESCRIPTION,VALUE>>.
    */
   @Override
   public Map<String, Map.Entry<String, Object>> describe() {
@@ -312,12 +316,11 @@
     properties.put(ElasticConstants.HOSTS, new AbstractMap.SimpleEntry<>(
         "Comma-separated list of hostnames to send documents to using TransportClient. "
             + "Either host and port must be defined or cluster.",
-            this.hosts == null ? "" : String.join(",", hosts)));
+        this.hosts == null ? "" : String.join(",", hosts)));
     properties.put(ElasticConstants.PORT, new AbstractMap.SimpleEntry<>(
         "The port to connect to using TransportClient.", this.port));
-    properties.put(ElasticConstants.INDEX,
-        new AbstractMap.SimpleEntry<>("Default index to send documents to.",
-            this.defaultIndex));
+    properties.put(ElasticConstants.INDEX, new AbstractMap.SimpleEntry<>(
+        "Default index to send documents to.", this.defaultIndex));
     properties.put(ElasticConstants.MAX_BULK_DOCS,
         new AbstractMap.SimpleEntry<>(
             "Maximum size of the bulk in number of documents.",