NUTCH-2757 - Indexer-elastic: add authentication options
diff --git a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
index a4c13d3..5f772ba 100644
--- a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
+++ b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
@@ -77,7 +77,6 @@
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
-
private static final int DEFAULT_PORT = 9300;
private static final int DEFAULT_MAX_BULK_DOCS = 250;
private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
@@ -110,14 +109,16 @@
@Override
public void open(Configuration conf, String name) throws IOException {
- //Implementation not required
+ // Implementation not required
}
/**
* Initializes the internal variables from a given index writer configuration.
*
- * @param parameters Params from the index writer configuration.
- * @throws IOException Some exception thrown by writer.
+ * @param parameters
+ * Params from the index writer configuration.
+ * @throws IOException
+ * Some exception thrown by writer.
*/
@Override
public void open(IndexWriterParams parameters) throws IOException {
@@ -135,44 +136,44 @@
DEFAULT_BULK_CLOSE_TIMEOUT);
defaultIndex = parameters.get(ElasticConstants.INDEX, DEFAULT_INDEX);
- maxBulkDocs = parameters
- .getInt(ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
- maxBulkLength = parameters
- .getInt(ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
- expBackoffMillis = parameters
- .getInt(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS,
- DEFAULT_EXP_BACKOFF_MILLIS);
- expBackoffRetries = parameters
- .getInt(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES,
- DEFAULT_EXP_BACKOFF_RETRIES);
+ maxBulkDocs = parameters.getInt(ElasticConstants.MAX_BULK_DOCS,
+ DEFAULT_MAX_BULK_DOCS);
+ maxBulkLength = parameters.getInt(ElasticConstants.MAX_BULK_LENGTH,
+ DEFAULT_MAX_BULK_LENGTH);
+ expBackoffMillis = parameters.getInt(
+ ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS,
+ DEFAULT_EXP_BACKOFF_MILLIS);
+ expBackoffRetries = parameters.getInt(
+ ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES,
+ DEFAULT_EXP_BACKOFF_RETRIES);
client = makeClient(parameters);
LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}",
maxBulkDocs, maxBulkLength);
- bulkProcessor = BulkProcessor.builder(
- (request, bulkListener) ->
- client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
- bulkProcessorListener())
+ bulkProcessor = BulkProcessor
+ .builder((request, bulkListener) -> client.bulkAsync(request,
+ RequestOptions.DEFAULT, bulkListener), bulkProcessorListener())
.setBulkActions(maxBulkDocs)
.setBulkSize(new ByteSizeValue(maxBulkLength, ByteSizeUnit.BYTES))
- .setConcurrentRequests(1).setBackoffPolicy(BackoffPolicy
- .exponentialBackoff(TimeValue.timeValueMillis(expBackoffMillis),
- expBackoffRetries)).build();
+ .setConcurrentRequests(1)
+ .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
+ TimeValue.timeValueMillis(expBackoffMillis), expBackoffRetries))
+ .build();
}
/**
* Generates a RestHighLevelClient with the hosts given
*/
- protected RestHighLevelClient makeClient(IndexWriterParams parameters) throws IOException {
+ protected RestHighLevelClient makeClient(IndexWriterParams parameters)
+ throws IOException {
hosts = parameters.getStrings(ElasticConstants.HOSTS);
port = parameters.getInt(ElasticConstants.PORT, DEFAULT_PORT);
-
+
user = parameters.get(ElasticConstants.USER, DEFAULT_USER);
password = parameters.get(ElasticConstants.PASSWORD, "");
-
- final CredentialsProvider credentialsProvider =
- new BasicCredentialsProvider();
+
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(user, password));
@@ -181,30 +182,32 @@
if (hosts != null && port > 1) {
HttpHost[] hostsList = new HttpHost[hosts.length];
int i = 0;
- for(String host: hosts) {
+ for (String host : hosts) {
hostsList[i++] = new HttpHost(host, port);
}
RestClientBuilder restClientBuilder = RestClient.builder(hostsList);
if (StringUtils.isNotBlank(cluster)) {
- Header[] defaultHeaders = new Header[]{new BasicHeader("cluster.name", cluster)};
+ Header[] defaultHeaders = new Header[] {
+ new BasicHeader("cluster.name", cluster) };
restClientBuilder.setDefaultHeaders(defaultHeaders);
- } else {
+ } else {
LOG.debug("No cluster name provided so using default");
}
- //Only add username and password if password is configured
- if(StringUtils.isNotBlank(password)) {
- restClientBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
- @Override
- public HttpAsyncClientBuilder customizeHttpClient(
- HttpAsyncClientBuilder arg0) {
- return arg0
- .setDefaultCredentialsProvider(credentialsProvider);
- }
- });
+ // Only add username and password if password is configured
+ if (StringUtils.isNotBlank(password)) {
+ restClientBuilder
+ .setHttpClientConfigCallback(new HttpClientConfigCallback() {
+ @Override
+ public HttpAsyncClientBuilder customizeHttpClient(
+ HttpAsyncClientBuilder arg0) {
+ return arg0.setDefaultCredentialsProvider(credentialsProvider);
+ }
+ });
}
client = new RestHighLevelClient(restClientBuilder);
- } else {
- throw new IOException("ElasticRestClient initialization Failed!!!\\n\\nPlease Provide the hosts");
+ } else {
+ throw new IOException(
+ "ElasticRestClient initialization Failed!!!\\n\\nPlease Provide the hosts");
}
return client;
@@ -260,9 +263,8 @@
}
builder.endObject();
- IndexRequest request = new IndexRequest(defaultIndex)
- .id(id)
- .source(builder);
+ IndexRequest request = new IndexRequest(defaultIndex).id(id)
+ .source(builder);
request.opType(DocWriteRequest.OpType.INDEX);
bulkProcessor.add(request);
@@ -298,9 +300,11 @@
}
/**
- * Returns {@link Map} with the specific parameters the IndexWriter instance can take.
+ * Returns {@link Map} with the specific parameters the IndexWriter instance
+ * can take.
*
- * @return The values of each row. It must have the form <KEY,<DESCRIPTION,VALUE>>.
+ * @return The values of each row. It must have the form
+ * <KEY,<DESCRIPTION,VALUE>>.
*/
@Override
public Map<String, Map.Entry<String, Object>> describe() {
@@ -312,12 +316,11 @@
properties.put(ElasticConstants.HOSTS, new AbstractMap.SimpleEntry<>(
"Comma-separated list of hostnames to send documents to using TransportClient. "
+ "Either host and port must be defined or cluster.",
- this.hosts == null ? "" : String.join(",", hosts)));
+ this.hosts == null ? "" : String.join(",", hosts)));
properties.put(ElasticConstants.PORT, new AbstractMap.SimpleEntry<>(
"The port to connect to using TransportClient.", this.port));
- properties.put(ElasticConstants.INDEX,
- new AbstractMap.SimpleEntry<>("Default index to send documents to.",
- this.defaultIndex));
+ properties.put(ElasticConstants.INDEX, new AbstractMap.SimpleEntry<>(
+ "Default index to send documents to.", this.defaultIndex));
properties.put(ElasticConstants.MAX_BULK_DOCS,
new AbstractMap.SimpleEntry<>(
"Maximum size of the bulk in number of documents.",